#!/usr/bin/perl -w

# padb. a simple parallel debugging aid.

# For help and support visit http://padb.pittman.org.uk
# or email padb-users@pittman.org.uk

# Copyright (C) 2005-2007 Quadrics.
# Copyright (C) 2009 Ashley Pittman.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

# Revision history
#
# Version 3.1
#  * Added a --create-secret-file option to automatically create a secret
#    file
#  * Allow the secret file to contain digits and dots as well as letters
#
# Version 3.0
#  * Full-duplex communication between inner and outer processes, padb
#    no longer simply sends request on the command line and processes
#    the response but is truly interactive between the inner and outer
#    processes.  This avails greater flexibility in what can be achieved
#    and hopefully helps with scalability as well.
#  * Enabled warnings (-w) by default.  Fixed lots of warnings, mostly
#    about comparing with undef
#  * Much more complete separation into "modes" of operation, most options
#    are now mode specific rather than simply using global variables or
#    global conf options.
#  * Overhaul of the allfns (mode) callbacks and in particular their parameters
#  * Performance improvements.
#  * Simplify the slurm_find_pids() function to just return the output
#    of scontrol listpids
#  * Take the old process-tree walking code from slurm_find_pids() and make
#    it independant and call it for all resource managers.  This allows scripts
#    which call parallel applications to be bypassed and the applications
#    themselves targetted.
#  * Added "port-range" option to limit port usage in case people try and use
#    padb with firewalls enabled.
#
# Version 2.5
#  * First Non-Quadrics version
#  * Various stability/bug fixes.
#  * Deadlock detect at the MPI Layer rather than the Elan layer
#    if running with a patched MPI (Work in progress)
#  * Completely new build and packing procedure to go with the new
#    maintainer.
#  * Added "orte" to the list of resource managers supported
#  * Don't enable local-qsnet on non-qsnet systems.
#  * inner_main() now uses callbacks for resource manager support.
#  * --signal now takes names rather than numbers.
#  * Check job is valid when using the --full-report option.
#  * Add a --proc-summary option to replace --proc-info --proc-format
#    This gives a very efficient "job top" program.

# Version 2.2
#  * Add a --core-stack option along with --core and --exe to extract stack
#    traces from core files.
#
# Version 2.1
#  * Add some magic to return complex perl data structures back from the inner
#    callback functions to the output callback function. (nfreeze/base64_encode).
#  * Add "MPI watch" functionality to allow viewing of MPI state in a vmstat
#    like fashion.
#  * Add a --list-rmgrs option to list active resource managers and their jobs.
#  * Add support for "local-qsnet" as a way of launching jobs.
#  * Add support for "local-fd" as a way of launching jobs.
#  * Add support for "mpd" as a way of launching jobs.
#  * Add support for "lsf-rms" as a way of launching jobs.  Note the lsf/rms
#    integration means this is highly unlikely to work for everyone.
#  * Add a -Olsf-job-offset option for finding lsf jobs in the rms database.
#  * Support for MPI message queues as well as libelan queues (-Q)
#  * Add a -Ominfo=<exe> option for finding the new minfo.x command.
#  * Add a -Ompi-dll=<dll.so> option for overriding the debugger dll.
#  * Extend the gdb handling code to allow further expansion in the future.
#  * Make the strip-below and strip-above functions configurable.
#  * Add support for loading settings from the environment or a config file.
#  * Add support for "local" as a resource manager to target hand-crafted capabilities.
#  * Ignore case when matching stats names.
#  * Correct printing of debug information from the inner.
#  * Try and remove warnings when run with -w (still disabled)
#  * Un-break it on slurm systems without RMS installed.
#  * Preliminary threading support (courtesy of LLNL)
#  * Show per-rail sdram usage of processes.
#  * Look at all descendant processes of slurmstepd rather than direct descendants
#    and try and avoid scripts (perl/sh/bash/xterm)
#  * Use the new scontrol listpids and %A options to squeue for finding processes
#    on slurm systems (1.2.0 and above).
#  * Don't show usage on command line errors.
#  * Only pass command line options onto the inner if it is going to handle them
#
# Version 2.0
#  * Removed the -OscriptDir option as it's no longer used, use -Oedb instead.
#  * Corrected the way tally statistics were being added.
#  * Added a --show-jobs option to show possible jobs in a resource manager
#    independent way.
#  * Added a --local-stats option to show message statistics for all processes
#    on the local node.
#  * Added a --proc-format option which allows specific entries from /proc to be
#    reported on a per-vp basis.
#  * Ported to slurm only systems where the RMS kernel module isn't present.
#  * Removed the padb-helper.pl file and folded it's functionality into padb
#    itself. Padb is now self-contained.
#  * Removed the padb.gdb file from the kit, it's generated at run-time if
#    needed.
#  * Various readability fixes and small performance improvements.
#  * Added a --kill option along with --signal which can be used to send signals
#    to any process in the parallel job.
#
# Version 1.6
#  * Add a --proc-info option to show the contents of files from /proc for a
#    given rank
#  * Increase the RMS_EXITTIMEOUT value from 10 to 30 seconds and make it a
#    configuration option.
#
# Version 1.5
#  * Try and load edb from where padb is installed. This should allow it to run
#    on elan3 systems where the qsnetdefault link is set to elan3.
#  * GNAT:8110 Proper use of English in the group deadlock detection report.
#  * Target the correct process if there are multiple processes for each vp.
#    Use the pid of the process which called elan_baseInit()
#  * GNAT 7945: Fix messages on stderr about integer overflow on 32 bit machines
#  * Remove warnings when -w is turned on.
#  * Re-work the stack trace tree generation code do work via a intermediate
#    data structure to make the code easier to parse.
#  * Report errors loading stats from a running job cleanly.
#  * Better backwards compatibility with older RMS releases.
#  * Add a padb-treeview script to the release, this takes most of it's code
#    from padb and uses tk to provide the user with a X based view of the stack
#    traces.
#  * Changes to edb so the stats loading code can run on elan3 systems.
#
# Version 1.4
#  * Bumped version number to 1.4
#  * Change the format of tree based stack traces, it now uses a more logical
#    indention style.
#  * Discover and report if application stats are incomplete.
#  * Allow the use of -r with -s to view statistics from an individual process.
#    Update -S (which does the same thing) to parse the stats in padb as well.
#  * Improved error handling in the case where jobs complete whilst padb is
#    running, sample the job state before and after going parallel and do the
#    right thing accordingly.
#  * Much improved error output, only report an error if something bad happened.
#  * Changes to the code as required to enable padb to run cleanly with
#    warnings (-w) enabled.
#  * Added a -Ostats-name= option to allow the extraction of one specific
#    statistic from the command line.
#  * Create separate file descriptors for stdout and stderr when running in
#    parallel to make it more resilient.
#
# Version 1.3
#  * Strip stack traces below main when possible, add a --nostrip-below-main
#    option to turn this off.
#  * Strip stack traces above elan_waitWord when possible, add a
#    --nostrip-above-wait option to turn this off.
#  * Added a -Ogdb-retry-count=N option. Defaults to three as before but is now
#    tunable.
#  * Parse communication statistics in padb directly now rather then relying on
#    edb to do it for us
#  * Allow reading of stats from file (-s -i )
#  * Perform group deadlock detection in padb directly rather than in edb,
#    improved the output and handling of corner cases.
#  * Initial version of a "one process per line" method of statistics reporting.
#  * Better catching and reporting of errors when running parallel programs.
#  * Bumped the version number to 1.3
#
# Version 1.2
#  * Converted padb to use long command line options. The short ones still work
#    but now have long alternatives
#  * Removed the need to set -OscriptDir= when running in non-default locations
#  * Added a --full-report=<jobId> option to gather all possible information
#  * General tidy ups in the stack trace tree generation code.
#  * Now reports processes that aren't present when generating stack traces.
#  * Now reports errors properly when there are errors launching the parallel
#    job
#  * Calls edb directly rather than using a helper script when possible
#    (statistics reports).
#  * Incremented version number from 1.0 to 1.2.
#

# TODO:
#
# * More testing with -w turned on.
# * Multi-pass argument handling, --kill also accepts --signal for example,
#   this should really be done at the getopt layer.  Also proper usage
#   info for these secondary args.
# * Paramater checking of secondary args, signal has a hacky implementation and
#   port-range doesn't have any checking currently.
# * libunwind support?  lighter weight than gdb and possibly more reliable.
# * Maybe PMI would help?
# * POD? generated man page?
# * mode specific defaults, for example --mpi-watch should enable --watch
#   -Owatch-clears-screen=0
# * Make -q fallback to -Q if tports are not available
# * ???
# * Allow ranges of ranks to be specified.

###############################################################################

use strict;
use Getopt::Long;
use File::Basename;
use IPC::Open3;
use Cwd;
use Data::Dumper;
use Storable qw(dclone nfreeze thaw);
use Sys::Hostname;
use File::Temp qw(tempfile);
use MIME::Base64;
use Config;
use IO::Socket;
use IO::Select;
use Carp;

###############################################################################
#
# Header.
#
###############################################################################

# Formatted with the command 'perltidy -b -ce -w padb' to maintain a
# vaguely readable form.

# This (large) source file contains a number of loosely separated segments,
# namely...

# Header.
# Resource manager setup
# Config options and defaults
# Usage and version
# Globals
# Elan statistics.
# Group deadlock detection
# Local (per node) stats.
# Stack trace tree compression.
# RMS support.
# Slurm support.
# Resource manager support.
# Output formatting
# Data collection (parallel and from file).
# Outer main
# Inner
# Main.

my $prog    = basename $0;
my $version = "3.1";

my %conf;

my $secret;

# Config options the inner knows about, only forward options if they are in this list.
my @inner_conf = qw(edb edbopt minfo rmgr scripts slurm_job_step verbose);

# More config options the inner knows about, these are forwarded on the command line
# rather than over the sockets.
my @inner_conf_cmd = qw(port_range outer);

###############################################################################
#
# Resource manager setup
#
###############################################################################

# A hash of supported resource managers, each of which provides a number of
# functions for querying the state of the machine.  This keeps the core
# of the code tidy.   Note that this is only for the "outer" instance of the
# program, the inner version does things differently.

# Function        Args Returns   Required Description
# is_installed    -    Bool      yes      Check for being installed and running.
# get_active_jobs user List      yes      Return list of all active job for user.
# is_job_running  job  Bool      no       Check if a given job is running.
# job_to_key      job  key       no       Convert from jobId to shm key.
# setup_pcmd      job  cmd|ncpus yes      Command needed to launch shadow jobs.
# cleanup_pcmd    -    -         no       Cleans up and temporary files.
# find_pids       job  -         maybe    Called on the inner to locate pids.

# inner_rmgr      var  n/a       no       Resource manager to masquerade as.

my %rmgr;

$rmgr{rms} = {
    'is_installed'    => \&rms_is_installed,
    'get_active_jobs' => \&rms_get_jobs,
    'job_is_running'  => \&rms_job_is_running,
    'job_to_key'      => \&rms_job_to_key,
    'setup_pcmd'      => \&rms_setup_pcmd,
    'find_pids'       => \&rms_find_pids,
};

$rmgr{mpd} = {
    'is_installed'           => \&mpd_is_installed,
    'get_active_jobs'        => \&mpd_get_jobs,
    'setup_pcmd'             => \&mpd_setup_pcmd,
    'cleanup_pcmd'           => \&mpd_cleanup_pcmd,
    'find_pids'              => \&mpd_find_pids,
    'require_inner_callback' => 1,
};

$rmgr{orte} = {
    'is_installed'    => \&open_is_installed,
    'get_active_jobs' => \&open_get_jobs,
    'setup_pcmd'      => \&open_setup_pcmd,
    'cleanup_pcmd'    => \&open_cleanup_pcmd,
    'find_pids'       => \&open_find_pids,
};

$rmgr{"lsf-rms"} = {
    'is_installed'    => \&lsf_is_installed,
    'get_active_jobs' => \&lsf_get_jobs,
    'setup_pcmd'      => \&lsf_setup_pcmd,
    'inner_rmgr'      => "rms",
};

$rmgr{slurm} = {
    'is_installed'           => \&slurm_is_installed,
    'get_active_jobs'        => \&slurm_get_jobs,
    'job_is_running'         => \&slurm_job_is_running,
    'setup_pcmd'             => \&slurm_setup_pcmd,
    'find_pids'              => \&slurm_find_pids,
    'require_inner_callback' => 1,
};

$rmgr{local} = {
    'get_active_jobs'        => \&local_get_jobs,
    'job_is_running'         => \&local_job_is_running,
    'setup_pcmd'             => \&local_setup_pcmd,
    'find_pids'              => \&local_find_pids,
    'require_inner_callback' => 1,
};

$rmgr{"local-qsnet"} = {
    'is_installed'           => \&local_q_is_installed,
    'get_active_jobs'        => \&local_q_get_jobs,
    'job_is_running'         => \&local_job_is_running,
    'setup_pcmd'             => \&local_setup_pcmd,
    'inner_rmgr'             => "local",
    'require_inner_callback' => 1,
};

$rmgr{"local-fd"} = {
    'get_active_jobs'        => \&local_fd_get_jobs,
    'job_is_running'         => \&local_job_is_running,
    'setup_pcmd'             => \&local_setup_pcmd,
    'inner_rmgr'             => "local",
    'require_inner_callback' => 1,
};

###############################################################################
#
# Config options
#
###############################################################################

# If changing any of these defaults also check the inner code as some
# of these settings affect that program as well and padb will only
# pass on settings on the command line, not the entire config hash.
# The reason they are listed here as well is so that padb -O help
# works and gives the correct defaults.

my %allfns;

my %cinner;        # Config options to be passed to inner.
my %cinner_cmd;    # Config options to be passed to inner.

my $rem_jobid;

# Debug options.
$conf{verbose} = 0;

# Valid values are "none" "missing" or "all".  Anything not recognised
# is treated as "all".
$conf{check_signon} = "all";

# Output options.
$conf{interval}            = 10;
$conf{watch_clears_screen} = 1;
$conf{scripts}             = "bash,sh,dash,ash,perl,xterm";
$conf{lsf_job_offset}      = 1;
$conf{local_fd_name}       = "/dev/null";
$conf{inner_callback}      = 0;

# These two are used by deadlock and QsNet group
# code, they need migrating in the group code
# when I have access to a test system again.
#$conf{"show-group-members"}  = 0;
#$conf{"show-all-groups"}     = 0;

# Tuning options.
$conf{prun_timeout}     = 120;
$conf{prun_exittimeout} = 120;
$conf{rmgr}             = undef;

$conf{slurm_job_step} = 0;

# These settings are passed onto inner only.
$conf{edbopt} = "";

$conf{edb}   = find_edb();
$conf{minfo} = find_minfo();

# Option to define a list of ports used by padb.
$conf{port_range} = undef;

$conf{tree_width} = 4;

my $norc       = 0;
my $configfile = "/etc/padb.conf";

# Look for edb in the default install location only.
sub find_edb {
    return "/usr/lib/qsnet/elan4/bin/"
      if ( -d "/usr/lib/qsnet/elan4/bin/" );
    return "/usr/lib64/qsnet/elan4/bin/"
      if ( -d "/usr/lib64/qsnet/elan4/bin/" );
    return "edb";
}

# Look for minfo.x in the same directory as padb.
sub find_minfo {
    my $dir = dirname($0);
    return "$dir/minfo.x";
}

###############################################################################
#
# usage and version.
#
###############################################################################

sub show_version {
    printf("$prog version $version\n\n");
    printf("Written by Ashley Pittman\n");
    printf("http://padb.pittman.org.uk\n");
    exit 0;
}

my $usage = <<EOF;
Usage: padb [-hv] [-c|-C|-t] -g|-q|-s|-x|-X [-O <opt>=<val>
            [,<opt>=<val>...]] [-i <file>] [-r <rank>] [-u <user>]
            -a|-A|<jobid ...>

-a --all               report on all running jobs for user.
-A --any               report on a running job for user.
-u --user=<USER>       report on jobs for username=<user>.

-r --rank=<RANK>       report only on processes <RANK>.
   --group-id=<ID>     report only on group <ID>.

XXXX
   --full-report=<JOBID> Generate a full report of job state.

   --nostrip-below-main Don\'t strip stack traces below main.
   --nostrip-above-wait Don\'t strip stack traces about elan_waitWord.

   --proc-format       Specify information to show about processes.

-c --compress          Use dshbac -c format.
-C --compress-long     Use other dshbak format.
-t --tree              Use tree based output for stack traces.
-i --input-file=FILE   Read input from file.

   --watch             

-O [opt1=val,<opt2=val>] Set internal config options for padb, advanced use only.
  Options in this version (these are liable to change)
  Use -Ohelp for showing current settings

  General options:
  verbose              Set verbosity level.
  edb                  Full path to edb

  Slurm only options
  slurm-job-step       Job step to target.

  RMS only options
  prun-timeout         Timeout to use when launching parallel job.

  Stack trace options:
  gdb-retry-count      Number of times to try getting a 'good' stack trace from gdb.
  stack-show-params    Show function parameters in stack traces.
  stack-show-locals    Show locals in stack traces.

  Statistics options:
  stats-short          Turn on "one process per line" stats reporting code.
  stats-sort-key       Sort stats by <key>.
  stats-reverse        Reverse order when showing stats.
  stats-name           Only report the value of a single stat.

  Group deadlock detection options:
  show-group-members   Show group to vp translations in the group code.
  show-all-groups      Report on all groups in a job.

  Watch options:
  interval             Refresh rate.

-v --verbose           Verbose.
-V --version           Show version number and exit.
-h --help              print this usage message.
EOF

sub usage {
    chomp $usage;

    my $extra = "";
    $extra .= "Modes of operation\n";
    foreach my $arg ( sort( keys %allfns ) ) {
        next unless ( defined $allfns{$arg}{help} );
        next if ( defined $allfns{$arg}{qsnet} );
        if ( defined $allfns{$arg}{arg_short} ) {
            $extra .= "-$allfns{$arg}{arg_short}";
        } else {
            $extra .= "  ";
        }
        $extra .= sprintf( " --%-18s%s.\n",
            $allfns{$arg}{arg_long},
            $allfns{$arg}{help} );
    }

    $extra .= "\nQsNet specific modes\n";
    foreach my $arg ( sort( keys %allfns ) ) {
        next unless ( defined $allfns{$arg}{help} );
        next unless ( defined $allfns{$arg}{qsnet} );
        if ( defined $allfns{$arg}{arg_short} ) {
            $extra .= "-$allfns{$arg}{arg_short}";
        } else {
            $extra .= "  ";
        }
        $extra .= sprintf( " --%-18s%s.\n",
            $allfns{$arg}{arg_long},
            $allfns{$arg}{help} );
    }

    $usage =~ s!XXXX!$extra!;

    print STDERR <<EOF;
$usage
EOF
    exit 1;
}

###############################################################################
#
# Globals.
#
###############################################################################

my $user = getpwuid($<);
my $rank_rng;

my @target_groups;
my $all;
my $any;

# Number of functions provided on the command line from the allfns hash.
my $have_allfns_option = 0;

my $full_report;
my $core_stack;
my $list_rmgrs;
my $create_secret;
my $watch;
my $local_stats;
my $show_jobs;

my $core_name;
my $exe_name;

my $input_file;
my $compress;
my $compress_C;
my $tree;

my @config_options;

my %ic_names;
my %ic_names_cmd;

# Populated in the outer args section so that outer code
# can access secondary comamnd line argunments by name.
my %secondary_args;

# Debugging: this function is called periodically with
# a mode, an abritary ref and a string, it can either print simply
# the string or call dumper on the ref as well.
# Enable with --debug=type1,type2=all
my %debugModes;
my $start_time = time();

sub debug_log {
    my ( $type, $handle, $str, @params ) = @_;
    if ( not exists $debugModes{$type} ) {
        printf("Unknown debug mode: $type\n");
        exit(1);
    }
    return unless $debugModes{$type};
    my $time = time() - $start_time;
    printf( "DEBUG ($type): %3d: $str\n", $time, @params );
    return if $debugModes{$type} eq "basic";
    return unless defined $handle;
    print Dumper $handle;
}

# Valid debug modes, a full list is maintained here so using unexpected
# ones can generate warnings.
$debugModes{full_duplex} = undef;
$debugModes{show_cmd}    = undef;
$debugModes{all}         = undef;
$debugModes{tree}        = undef;
$debugModes{verbose}     = undef;
$debugModes{signon}      = undef;
$debugModes{rmgr}        = undef;
$debugModes{ctree}       = undef;
$debugModes{tdata}       = undef;

sub parse_args_outer {

    Getopt::Long::Configure("bundling");
    my $debugflag;

    my @ranks;

    my %optionhash = (
        "verbose|v+"          => \$conf{verbose},
        "user|u=s"            => \$user,
        "rank|r=s"            => \@ranks,
        "group-id=s"          => \@target_groups,
        "help|h"              => \&usage,
        "all|a"               => \$all,
        "any|A"               => \$any,
        "version|V"           => \&show_version,
        "compress|c"          => \$compress,
        "compress-long|C"     => \$compress_C,
        "tree|t"              => \$tree,
        "input-file|file|i=s" => \$input_file,
        "config-option|O=s"   => \@config_options,
        "full-report=s"       => \$full_report,
        "core-stack"          => \$core_stack,
        "core=s"              => \$core_name,
        "exe=s"               => \$exe_name,
        "list-rmgrs"          => \$list_rmgrs,
        "watch"               => \$watch,
        "local-stats"         => \$local_stats,
        "show-jobs"           => \$show_jobs,
        "norc"                => \$norc,
        "config-file=s"       => \$configfile,
        "debug=s"             => \$debugflag,
        'create-secret-file'  => \$create_secret,
    );

    my %config_hash;
    foreach my $arg ( keys %allfns ) {
        $optionhash{ $allfns{$arg}{arg} } = \$config_hash{$arg};
        if ( defined $allfns{$arg}{secondary} ) {
            foreach my $sec ( @{ $allfns{$arg}{secondary} } ) {
                $sec->{value} = $sec->{default};
                $optionhash{ $sec->{arg} } = \$sec->{value};
            }
        }
        if ( defined $allfns{$arg}{options_i} ) {
            foreach my $o ( keys( %{ $allfns{$arg}{options_i} } ) ) {
                $conf{mode_options}{$arg}{$o} = $allfns{$arg}{options_i}{$o};
                $conf{mode_options_reverse}{$o}{$arg} = 1;
            }
        }
    }

    GetOptions(%optionhash) or exit(1);

    if ( defined $debugflag ) {
        foreach my $f ( split( ",", $debugflag ) ) {
            my ( $name, $v ) = split( "=", $f );
            if ( exists $debugModes{$name} ) {
                $debugModes{$name} = defined($v) ? $v : "basic";
            } else {
                printf("Attempt to set unknown debug flag \"$name\".\n");
            }
        }
        if ( $debugModes{all} ) {
            foreach my $mode ( keys(%debugModes) ) {
                if ( not defined $debugModes{$mode} ) {
                    $debugModes{$mode} = $debugModes{all};
                }
            }
        }
    }

    my $mode;

    foreach my $arg ( keys %config_hash ) {
        next unless defined $config_hash{$arg};
        $mode = $arg;
        $have_allfns_option++;
    }

    if (@ranks) {
        $rank_rng = rng_convert_from_user( shift(@ranks) );

        foreach my $rank (@ranks) {
            rng_merge( $rank_rng, rng_convert_from_user($rank) );
        }
    }

    # Put the args in a hash so that they can be referenced by name.
    if ( defined $mode and defined $allfns{$mode}{secondary} ) {
        foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
            $secondary_args{ $sec->{arg_long} } = $sec->{value};
        }
    }

    return $mode;
}

###############################################################################
#
# Elan statistics.
#
###############################################################################

# Work around problems with the "hex" function and whilst we are
# at it avoid warnings as well.
# Unfortunately hex can't deal with anything bigger than 2^31 without
# giving an error so simply +0 on the string to convert it to a int
# cleanly (GNAT 7945).
sub _hex {
    my $str = shift;
    if ( not defined $str ) {
        return 0;
    } elsif ( $str eq "0xffffffffffffffff" ) {
        return -1;
    } else {

        if ( length $str < 10 ) {
            return hex($str);
        }

        # It was hard to write, it's supposed to be hard to read.

        $str =~ s/\A0x//;
        my $lower = hex( "0x" . substr( "0" x 8 . $str, -8 ) );
        my $upper = hex( "0x" . substr( "0" x 16 . $str, -16, 8 ) );
        $lower += ( 0x10000000 * 0x10 * $upper );

        return $lower;
    }
}

sub sum_attr {
    my ( $current, $sum_so_far ) = @_;

    if ( defined $sum_so_far->{'raw'}[0]
        and $sum_so_far->{'raw'}[0] != $current->{'raw'}[0] )
    {
        $sum_so_far->{'raw'}[0] = undef;
    }

    return $sum_so_far;
}

sub sum_bin {
    my ( $current, $sum_so_far ) = @_;

    for ( my $j = 0 ; $j < 32 ; $j++ ) {
        $sum_so_far->{'raw'}[$j] += $current->{'raw'}[$j];
    }

    #check min
    if (
        ( $sum_so_far->{'raw'}[32] == -1 )
        or (    ( $current->{'raw'}[32] != -1 )
            and ( $current->{'raw'}[32] < $sum_so_far->{'raw'}[32] ) )
      )
    {
        $sum_so_far->{'raw'}[32] = $current->{'raw'}[32];
    }

    #check max
    if ( $current->{'raw'}[33] > $sum_so_far->{'raw'}[33] ) {
        $sum_so_far->{'raw'}[33] = $current->{'raw'}[33];
    }

    #total
    $sum_so_far->{'raw'}[34] += $current->{'raw'}[34];

    return $sum_so_far;
}

sub sum_counter {
    my ( $current, $sum_so_far ) = @_;

    $sum_so_far->{'raw'}[0] += $current->{'raw'}[0];
    return $sum_so_far;
}

sub sum_tally {
    my ( $current, $sum_so_far ) = @_;

    for ( my $j = 0 ; $j < 3 ; $j++ ) {
        $sum_so_far->{'raw'}[$j] += $current->{'raw'}[$j];
    }

    return $sum_so_far;
}

my @scales = (
    "Bytes",     "Kilobytes", "Megabytes", "Gigabytes",
    "Terabytes", "Petabytes", "Exabytes"
);

my @bin_names = (
    "0 bytes",   "1 byte",    "2 bytes",   "4 bytes",
    "8 bytes",   "16 bytes",  "32 bytes",  "64 bytes",
    "128 bytes", "256 bytes", "512 bytes", "1kb",
    "2kb",       "4kb",       "8kb",       "16kb",
    "32kb",      "64kb",      "128kb",     "256kb",
    "512kb",     "1mb",       "2mb",       "4mb",
    "8mb",       "16mb",      "32mb",      "64mb",
    "128mb",     "256mb",     "512mb",     "overflow"
);

sub show_counter {
    my ($d) = @_;

    my $ret = "";
    my $toshow;
    foreach my $counter ( sort keys %{$d} ) {

        if ( $d->{$counter}{raw}[0] != 0 or $conf{show_all_stats} ) {
            if ( defined $toshow ) {
                $ret .=
"  Counter: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]','$d->{$counter}{name}' = '$d->{$counter}{raw}[0]'\n";
                undef $toshow;
            } else {
                $toshow = $counter;
            }
        }
    }

    if ( defined $toshow ) {
        $ret .= "  Counter: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]'\n";
    }

    return $ret;
}

sub show_attr {

    # Hopefully have an array at this point.
    my ($d) = @_;

    my $ret = "";
    my $toshow;
    foreach my $attr ( sort keys %{$d} ) {
        next unless defined $d->{$attr}{raw}[0];
        if ( defined $toshow ) {
            $ret .=
"  Attribute: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]', '$d->{$attr}{name}' = '$d->{$attr}{raw}[0]'\n";
            undef $toshow;
        } else {
            $toshow = $attr;
        }

    }
    if ( defined $toshow ) {
        $ret .=
          "  Attribute: '$d->{$toshow}{name}' = '$d->{$toshow}{raw}[0]'\n";

    }
    return $ret;
}

sub show_tally {
    my ($d) = @_;

    my $ret = "";
    foreach my $tally ( sort keys %{$d} ) {
        if ( $d->{$tally}{raw}[0] or $conf{show_all_stats} ) {
            $ret .= sprintf(
                "%16s: Total: %d Active: %d HWM: %d\n",
                $d->{$tally}{name},   $d->{$tally}{raw}[0],
                $d->{$tally}{raw}[1], $d->{$tally}{raw}[2]
            );
        }
    }
    return $ret;
}

sub show_bin {
    my ($d) = @_;

    my $ret = "";
    foreach my $bin ( sort keys %{$d} ) {

        if (   $d->{$bin}{raw}[0] || $d->{$bin}{raw}[34]
            or $conf{show_all_stats} )
        {
            my $total = $d->{$bin}{raw}[34];
            my $scale = 0;

            while ( $total > 1024 ) {
                $total /= 1024;
                $scale++;
            }

            $ret .= sprintf(
"%16s: min $d->{$bin}{raw}[32] max $d->{$bin}{raw}[33] total $d->{$bin}{raw}[34] (%0.2f $scales[$scale])\n",
                $d->{$bin}{name}, $total );

            my @vals;
            for ( my $j = 0 ; $j < 32 ; $j++ ) {
                if ( $d->{$bin}{raw}[$j] or $conf{show_all_stats} > 1 ) {
                    push(
                        @vals,
                        sprintf( "%9s: %10d",
                            $bin_names[$j], $d->{$bin}{raw}[$j] )
                    );

                    if ( $#vals == 2 ) {
                        $ret .= sprintf( "  %s\n", join( " ", @vals ) );
                        undef @vals;
                    }
                }
            }
            if ( $#vals != -1 ) {
                $ret .= sprintf( "  %s\n", join( " ", @vals ) );
                undef @vals;
            }
        }
    }
    return $ret;
}

# These must stay in the correct order, that is the order
# they appear in shared memory.
my @stat_types = ( "Counter", "Tally", "Bin", "Attribute" );

my @display_order = qw(Attribute Counter Tally Bin);

my %stat_types2 = (
    Counter =>
      { size => "1", displayfn => \&show_counter, sumfn => \&sum_counter },
    Tally => { size => "3",  displayfn => \&show_tally, sumfn => \&sum_tally },
    Bin   => { size => "35", displayfn => \&show_bin,   sumfn => \&sum_bin },
    Attribute => { size => "1", displayfn => \&show_attr, sumfn => \&sum_attr },
);

sub parse_header {
    my ($block) = @_;
    my @a = split( ",", $block );

    my @header;

    if ( $a[0] ne "ELAN STATS" or $a[1] ne "falcon" ) {
        return undef;
    }

    my $index;
    for ( $index = 0 ; $index < 4 ; $index++ ) {
        $header[$index] = $a[$index];
    }

    while ( $index < $#a ) {
        $header[ $a[$index] ] = $a[ $index + 1 ];
        $index += 2;
    }

    return \@header;
}

# Convert from subsystem ID to name.
sub get_sub_name {
    my ( $id, $header ) = @_;
    $id *= 2;
    $id += 4;
    return $header->[$id];
}

# Convert from subsystem ID and stat type # to count.
sub get_sub_stat_count {
    my ( $id, $type, $header ) = @_;

    # Check for an invalid subsystem number.
    if ( $id >= _hex $header->[3] ) {
        return 0;
    }

    # Check for an invalid stat type.
    if ( $type >= _hex $header->[2] ) {
        return 0;
    }

    # Skip over the four entry header and expand.
    $id *= 2;
    $id += 4;

    # Move from subsystem name to offset.
    $id++;

    # Follow the offset.
    $id = $header->[$id];

    # Move to the correct type.
    $id += $type;
    return $header->[$id];
}

# Params:
# $id              Index of this subsystem.
# $type            This stat type.
# $idx             Number of this stat.
sub get_sub_stat_name {
    my ( $id, $type, $idx, $header ) = @_;

    # Check for an invalid subsystem number.
    if ( $id >= _hex $header->[3] ) {
        return 0;
    }

    # Check for an invalid stat type.
    if ( $type >= _hex $header->[2] ) {
        return 0;
    }

    # Skip over the four entry header and expand.
    $id *= 2;
    $id += 4;

    # Move from subsystem name to offset.
    $id++;

    # Follow the offset.
    $id = $header->[$id];

    my $offset = $id;

    # Header[2] is the number of stats type's.  4 currently.
    $id += _hex $header->[2];

    for ( my $i = 0 ; $i < $type ; $i++ ) {
        $id += $header->[ $offset + $i ];
    }

    return $header->[ $id + $idx ];
}

sub find_rail {
    my $r = shift;

    my $rail = _hex $r;

    if ( $rail == -1 ) {
        return "ELAN_RAIL_ALL";
    } else {
        return $rail;
    }
}

sub parse_content {
    my ( $block, $header ) = @_;
    my @a = split( ",", $block );
    my $index = 0;

    my @raw_data;

    return undef if ( $#a < 5 );

    for ( $index = 0 ; $index < 4 ; $index++ ) {
        $raw_data[$index] = _hex( $a[$index] );
    }

    while ( $index < $#a ) {
        $raw_data[ $a[$index] ] = $a[ $index + 1 ];
        $index += 2;
    }

    my %process_details;

    $process_details{vp}      = $raw_data[0];
    $process_details{nvp}     = $raw_data[1];
    $process_details{localid} = $raw_data[2];
    $process_details{nlocal}  = $raw_data[3];

    my $instBase = 4;

    while ( $instBase != 0 ) {
        my $sysId = _hex( $raw_data[$instBase] );
        my $sysname = get_sub_name( $sysId, $header );

        my %inst;

        $inst{sysId}      = _hex( $raw_data[$instBase] );
        $inst{name}       = get_sub_name( $sysId, $header );
        $inst{id}         = _hex $raw_data[ $instBase + 1 ];
        $inst{handle}     = $raw_data[ $instBase + 2 ];
        $inst{stats}      = _hex $raw_data[ $instBase + 6 ];
        $inst{rail}       = find_rail $raw_data[ $instBase + 4 ];
        $inst{next}       = _hex $raw_data[ $instBase + 5 ];
        $inst{valid}      = _hex $raw_data[ $instBase + 3 ];
        $inst{debugFlags} = $raw_data[ $instBase + 7 ];

        if ( $inst{stats} ) {
            my %stats;

            my $offset = $inst{stats};

            for ( my $type = 0 ; $type < $#stat_types + 1 ; $type++ ) {
                my $typename = $stat_types[$type];
                my $count = get_sub_stat_count( $inst{sysId}, $type, $header );

                next if $count eq 0;
                my %type;
                for ( my $idx = 0 ; $idx < $count ; $idx++ ) {
                    my %data;
                    my @raw;
                    for (
                        my $value = 0 ;
                        $value < $stat_types2{$typename}{size} ;
                        $value++
                      )
                    {
                        $raw[$value] = _hex $raw_data[$offset];
                        $offset++;
                    }
                    $data{name} =
                      get_sub_stat_name( $inst{sysId}, $type, $idx, $header );
                    $data{raw} = \@raw;

                    $type{ $data{name} } = \%data;
                }
                $stats{$typename} = \%type;
            }
            $inst{statistics} = \%stats;
        } else {
            $process_details{complete} = 0;
        }

        $instBase = $inst{next};

        delete $inst{stats};
        delete $inst{next};
        delete $inst{sysId};
        delete $inst{debugFlags} if ( !$inst{debugFlags} );

        $process_details{subsystems}{ $inst{name} }{ $inst{id} } = \%inst;

    }

    # Work out if there is missing data.
    if (
        defined $process_details{subsystems}{Core}{1}{statistics}{Counter}
        {Overflow} )
    {
        if ( not defined $process_details{complete} ) {
            if ( $process_details{subsystems}{Core}{1}{statistics}{Counter}
                {Overflow}{raw}[0] == 0 )
            {
                $process_details{complete} = 1;
            } else {
                $process_details{complete} = 0;
            }
        }
    }

    return \%process_details;
}

sub total {
    my ($data_structures_aref) = @_;
    my %total;         #holds data structures keyed by name and id
    my @keys_order;    #keep the order new names and ids were encountered

    # Make an initial total by just copying the first set of stats
    # carte blance.
    my $summed_structure = dclone( $data_structures_aref->[0] );

    for ( my $cr = 1 ; $cr <= $#{$data_structures_aref} ; $cr++ ) {
        my $current_structure = $data_structures_aref->[$cr];

        # Copy the vp and nvp information, we might miss new entries
        # in current_structure but for the time being all entries
        # are known to be common.
        # XXX: This assertion no longer holds true, {complete} is only
        # defined where it is known and has values 0 and 1.
        # Having said that it will all work though as if it's value is not
        # known it can't be 1 which is the only value we care about.
        foreach my $header ( keys %{$summed_structure} ) {
            next if ( $header eq 'subsystems' );
            if ( defined $summed_structure->{$header}
                and $summed_structure->{$header} ne
                $current_structure->{$header} )
            {
                $summed_structure->{$header} = undef;
            }
        }

        #add to each set of stats if it exists, else clone the new set
        foreach my $name ( keys %{ $current_structure->{'subsystems'} } ) {
            if ( $summed_structure->{'subsystems'}{$name} ) {
                foreach
                  my $id ( keys %{ $current_structure->{'subsystems'}{$name} } )
                {
                    if ( $summed_structure->{'subsystems'}{$name}{$id} ) {

                        next
                          unless (
                            defined $current_structure->{'subsystems'}{$name}
                            {$id}{'statistics'} );

                        if (
                            not( $summed_structure->{'subsystems'}{$name}{$id}
                                {'statistics'} )
                          )
                        {

                            $summed_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} = dclone(
                                $current_structure->{'subsystems'}{$name}{$id}
                                  {'statistics'} );
                            next;
                        }

                        my %current_stat =
                          %{ $current_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} };

                        my %summed_stat =
                          %{ $summed_structure->{'subsystems'}{$name}{$id}
                              {'statistics'} };

                        #add to each type of stats if it exists, else copy
                        #the new set
                        foreach my $stat_type ( keys %current_stat ) {
                            if ( $summed_stat{$stat_type} ) {

                                foreach my $stat_name (
                                    keys %{ $current_stat{$stat_type} } )
                                {
                                    if (
                                        $summed_stat{$stat_type}->{$stat_name} )
                                    {

                                #do the adding up correctly for the type of stat
                                        $summed_stat{$stat_type}->{$stat_name} =
                                          $stat_types2{$stat_type}{sumfn}(
                                            $current_stat{$stat_type}
                                              ->{$stat_name},
                                            $summed_stat{$stat_type}
                                              ->{$stat_name}
                                          );
                                    } else {
                                        $summed_stat{$stat_type}->{$stat_name} =
                                          dclone( $current_stat{$stat_type}
                                              ->{$stat_name} );
                                    }
                                }
                            } else {
                                $summed_stat{$stat_type} =
                                  dclone( $current_stat{$stat_type} );
                            }
                        }
                    } else {
                        $summed_structure->{'subsystems'}{$name}{$id} = dclone(
                            $current_structure->{'subsystems'}{$name}{$id} );
                    }
                }
            } else {
                $summed_structure->{$name} =
                  dclone( $current_structure->{$name} );
            }
        }
    }

    return $summed_structure;
}

# Convert from long to terse stats.
sub summarise {

    my $datastructure = shift;
    my %ret           = (
        'Bin'     => 0,
        'Counter' => 0,
        'Tally'   => 0
    );
    if ( defined $datastructure->{'vp'} ) {
        $ret{'vp'} = $datastructure->{'vp'};
    }
    foreach my $subsystem ( keys %{ $datastructure->{'subsystems'} } ) {
        foreach my $id ( keys %{ $datastructure->{'subsystems'}{$subsystem} } )
        {
            my $statistics =
              $datastructure->{'subsystems'}{$subsystem}{$id}{'statistics'};
            foreach my $bin ( keys %{ $statistics->{'Bin'} } ) {

                #Bin has a total value so just add that
                $ret{'Bin'} += $statistics->{'Bin'}{$bin}{'raw'}[34];
            }
            foreach my $counter ( keys %{ $statistics->{'Counter'} } ) {
                $ret{'Counter'} += $statistics->{'Counter'}{$counter}{'raw'}[0];
            }
            foreach my $tally ( keys %{ $statistics->{'Tally'} } ) {
                $ret{'Tally'} += $statistics->{'Tally'}{$tally}{'raw'}[0];
            }
        }
    }
    return \%ret;
}

sub summarise_many {
    my $many = shift;
    my @ret;
    foreach my $single ( @{$many} ) {
        push( @ret, summarise($single) );
    }
    return \@ret;
}

sub collapse_summaries {
    my $summaries = shift;
    my %ret       = (
        'Bin'     => 0,
        'Counter' => 0,
        'Tally'   => 0
    );
    foreach my $summary ( @{$summaries} ) {
        foreach my $key ( keys %ret ) {
            $ret{$key} += $summary->{$key};
        }
    }
    return \%ret;
}

sub display_hash {
    my $hash = shift;
    format WITH_VP =
vp @>>>> Counter @>>>>>>>>> Tally @>>>>>>>>> Bin @>>>>>>>>>>>>>>
$hash->{vp}, $hash->{Counter}, $hash->{Tally}, $hash->{Bin}
.
    format WITHOUT_VP =
Counter @>>>>>>>>> Tally @>>>>>>>>> Bin @>>>>>>>>>>>>>>
$hash->{Counter}, $hash->{Tally}, $hash->{Bin}
.
    local $~;
    if ( defined $hash->{vp} ) {
        $~ = "WITH_VP";
    } else {
        $~ = "WITHOUT_VP";
    }
    write STDOUT;
}

sub display_hashes {
    my ( $hashes, $sort, $reverse ) = @_;
    my $ret = '';

    my $rev = $reverse;

    $rev = not $rev if ( $sort eq "vp" );

    if ($rev) {
        foreach my $e ( sort { $a->{$sort} <=> $b->{$sort} } ( @{$hashes} ) ) {
            $ret .= display_hash($e);
        }
    } else {
        foreach my $e ( sort { $b->{$sort} <=> $a->{$sort} } ( @{$hashes} ) ) {
            $ret .= display_hash($e);
        }
    }
    return $ret;
}

# FIXME:  This function really should be merged with as show_inst...
sub show_name {
    my ( $des, $stats ) = @_;

    if ( not defined $des ) {
        return show_inst($stats);
    }

    my @req = split( "\\.", $des );

    my $ret = "";

    foreach my $name2 ( sort keys %{ $stats->{subsystems} } ) {
        my $name = $stats->{subsystems}{$name2};

        next unless ( lc($name2) eq lc( $req[0] ) );

        foreach my $id2 ( sort { $a <=> $b } keys %{$name} ) {
            my $sis = $name->{$id2};

            next if ( $#req > 0 and $sis->{id} ne $req[1] );

            if ( $#req < 2 ) {
                $ret .=
"Subsystem '$sis->{name}' id: $sis->{id}  Handle: $sis->{handle} rail: $sis->{rail}\n";
            }

            foreach my $type (@display_order) {
                next unless defined $sis->{statistics}{$type};

                if ( $#req > 1 ) {

                    foreach
                      my $s_name ( sort keys %{ $sis->{statistics}{$type} } )
                    {
                        next if ( $#req > 1 and lc($s_name) ne lc( $req[2] ) );
                        $ret .= "@{$sis->{statistics}{$type}{$s_name}{raw}}\n";
                    }
                } else {
                    if ( defined $stat_types2{$type}{displayfn} ) {
                        $ret .=
                          $stat_types2{$type}{displayfn}(
                            $sis->{statistics}{$type} );
                    }
                }
            }
        }
    }
    return $ret;
}

sub show_inst {
    my ($stats) = @_;

    my $ret;

    if ( defined $stats->{vp} ) {
        $ret = "This is vp $stats->{vp}/$stats->{nvp}\n";
    } else {
        $ret = "Statistics for a $stats->{nvp} process job\n";
    }

    foreach my $name2 ( sort keys %{ $stats->{subsystems} } ) {
        my $name = $stats->{subsystems}{$name2};

        foreach my $id2 ( sort { $a <=> $b } keys %{$name} ) {
            my $sis = $name->{$id2};

            $ret .=
"Subsystem '$sis->{name}' id: $sis->{id}  Handle: $sis->{handle} rail: $sis->{rail}\n";

            if ( not defined $sis->{statistics} ) {
                $ret .= "no statistics recorded.\n";
                next;
            }

            foreach my $type (@display_order) {
                next unless defined $sis->{statistics}{$type};

                if ( defined $stat_types2{$type}{displayfn} ) {
                    $ret .=
                      $stat_types2{$type}{displayfn}(
                        $sis->{statistics}{$type} );

                }
            }
        }
    }
    return $ret;
}

sub read_stats {
    my @data = @_;

    my $header = parse_header( shift @data );

    return undef unless $header;

    my @out;
    foreach my $vp (@data) {
        my $parsed = parse_content( $vp, $header );
        if ( defined $parsed ) {
            push( @out, $parsed );
        }
    }

    return \@out;
}

sub show_stats {
    my $d = shift;

    # This function is slightly delicate, the --full-report option
    # calls this function with $stats_total and $group set.

    # What to do about the -r option:
    # If it's set then display individual results for the given
    # vp's only, if it's not set then display a total for everyone.

    if ( not $d ) {
        print("QsNet Statistics not valid\n");
        return;
    }

    my $stats_total = 0;
    my $group       = 0;

    if ($stats_total) {

        if ( $conf{stats_short} ) {
            my $new;
            if ( defined $rank_rng ) {
                my @ret;
                my $rng = rng_dup($rank_rng);
                while ( defined( my $rank = rng_shift($rng) ) ) {
                    if ( defined $d->[$rank] ) {
                        push( @ret, summarise( $d->[$rank] ) );
                    } else {
                        my $vps = $#{$d} + 1;
                        print "Invalid rank $rank (0 to $vps)\n";
                    }
                }
                $new = \@ret;
            } else {
                $new = summarise_many($d);
            }

            display_hashes( $new, $conf{stats_sort_key}, $conf{stats_reverse} );
            return;
        }

        if ( defined $rank_rng ) {
            my $rng = rng_dup($rank_rng);
            while ( defined( my $rank = rng_shift($rng) ) ) {
                if ( defined $d->[$rank] ) {
                    print show_name $conf{stats_name}, $d->[$rank];
                } else {
                    my $vps = $#{$d} + 1;
                    print "Invalid rank $rank (0 to $vps)\n";
                }
            }
        } else {
            print show_name $conf{stats_name}, total($d);
        }
    }

    if ($group) {
        print group_status($d);
    }
}

###############################################################################
#
# Group deadlock detection
#
###############################################################################

sub group_status_helper {
    my $str        = shift;    # tagged onto the end of the line.
    my $possessive = shift;    # syntax to use (possessive/attributive)
    my $size       = shift;    # size of the group
    my @identical  = @_;       # member list
    my $ret;
    my $sstr = defined $size ? " (size $size)" : "";

    my $members = "members";
    my $are     = "are";
    my $have    = "have";

    if ( $#identical == 0 ) {
        $members = "member";
        $are     = "is";
        $have    = "has";
    }

    if ($possessive) {
        $are = $have;
    }

    $ret .=
      sprintf( "Group $members %s$sstr $are $str.\n", compress(@identical) );

    return $ret;
}

sub group_status {
    my $data_structures_aref = shift;

    my %ad;

    my %tg;

    if ( $#target_groups != -1 ) {
        foreach my $gid (@target_groups) {
            $tg{$gid}++;
        }
    }

    # Loop over each vp...
    foreach my $dataset ( @{$data_structures_aref} ) {

        # Loop over each group within the process.
        foreach my $gid ( keys %{ $dataset->{'subsystems'}{'Group'} } ) {

            if ( $#target_groups != -1 ) {
                next unless defined $tg{$gid};
            }

            my $str;

            my $this_group = $dataset->{'subsystems'}{'Group'}{$gid};

            my $ident = $dataset->{vp};

            if ( $this_group->{'statistics'} ) {

                # XXX: Why is this first test here,
                if (    $this_group->{statistics}{Attribute}
                    and $this_group->{statistics}{Attribute}{Self} )
                {
                    $ident = $this_group->{statistics}{Attribute}{Self}{raw}[0];
                    $ad{$gid}{size} =
                      $this_group->{statistics}{Attribute}{Size}{raw}[0];
                    $ad{$gid}{map}[$ident] = $dataset->{vp}
                      if ( $conf{show_group_members} );
                }

                $ad{$gid}{idents}{$ident}{'statistics'}++;

                foreach
                  my $tally ( keys( %{ $this_group->{statistics}{Tally} } ) )
                {
                    my $name = $this_group->{statistics}{Tally}{$tally}{'name'};
                    my $number =
                      $this_group->{statistics}{Tally}{$tally}{'raw'}[0];
                    my $active =
                      $this_group->{statistics}{Tally}{$tally}{'raw'}[1];
                    if ( $active != 0 ) {
                        $ad{$gid}{'active'}{$name}++;
                        $ad{$gid}{idents}{$ident}{'active'}{$name} = $number;
                    } else {
                        $ad{$gid}{idents}{$ident}{'inactive'}{$name} = $number;
                    }
                }
            }
            $ad{$gid}{idents}{$ident}{'valid'} = $this_group->{'valid'};
        }
    }

    my $ret = "";
    my $missing_self;
    my $i_count = 0;    # Interesting groups.
    my $d_count = 0;    # Destroyed groups.
    foreach my $gid ( sort { $a <=> $b } keys %ad ) {

        if ( $#target_groups != -1 ) {
            next unless defined $tg{$gid};
        }

        my $gstr = "Information for group '$gid'\n";

        # Maybe show the group members, hope that the user doesn't turn
        # this on unless also setting target_groups!
        if ( $conf{show_group_members} ) {
            $gstr .= "group has $ad{$gid}{size} members\n";
            if ( defined $ad{$gid}{size} and $gid != 1 ) {
                for ( my $ident = 0 ; $ident < $ad{$gid}{size} ; $ident++ ) {
                    $gstr .=
                      "group member[$ident] => vp[$ad{$gid}{map}[$ident]]\n";
                }
            }
        }

        my $gone;
        {
            my @invalid;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                if ( $ad{$gid}{'idents'}{$ident}{'valid'} eq 0 ) {
                    push @invalid, $ident;
                }
            }
            if ( $#invalid != -1 ) {
                if ( $conf{show_all_groups} ) {
                    $ret .= $gstr
                      . group_status_helper( "showing the group as removed",
                        0, $ad{$gid}{size}, @invalid );
                    $gstr = "";
                }
                if ( $#invalid == ( $ad{$gid}{size} - 1 ) ) {
                    $gone++;
                    $d_count++;
                }
            }
        }
        next if $gone;

        # Find and report groups which don't have statistics
        {
            my @identical;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                push( @identical, $ident )
                  unless ( $ad{$gid}{'idents'}{$ident}{'statistics'} );
            }
            if ( $#identical != -1 ) {
                $missing_self++;
                if ( $conf{show_all_groups} ) {
                    $ret .= $gstr
                      . group_status_helper(
                        "no statistics for this group *(1)",
                        1, $ad{$gid}{size}, @identical );
                    $gstr = "";
                } else {
                    $gstr .=
                      group_status_helper( "no statistics for this group *(1)",
                        1, $ad{$gid}{size}, @identical );
                }
            }
        }

        if ( $ad{$gid}{'active'} ) {
            $i_count++;

            # For all collective calls which we are interested in
            foreach my $s ( keys %{ $ad{$gid}{'active'} } ) {
                my %active;
                my %inactive;

                foreach my $ident ( keys %{ $ad{$gid}{'idents'} } ) {
                    if ( defined $ad{$gid}{'idents'}{$ident}{'active'}
                        and $ad{$gid}{'idents'}{$ident}{'active'}{$s} )
                    {
                        my $number = $ad{$gid}{'idents'}{$ident}{'active'}{$s};
                        push( @{ $active{$number} }, $ident );
                    } elsif ( $ad{$gid}{'idents'}{$ident}{'inactive'}{$s} ) {
                        my $number =
                          $ad{$gid}{'idents'}{$ident}{'inactive'}{$s};
                        push( @{ $inactive{$number} }, $ident );
                    }
                }
                foreach my $number ( sort ( keys %active ) ) {
                    $ret .= $gstr
                      . group_status_helper( "in call $number to $s",
                        0, $ad{$gid}{size}, @{ $active{$number} } );
                    $gstr = "";

                }
                foreach my $number ( sort ( keys %inactive ) ) {
                    $ret .= group_status_helper( "completed call $number to $s",
                        1, $ad{$gid}{size}, @{ $inactive{$number} } );
                }
            }
        } else {
            next unless ( $conf{show_all_groups} );
        }

        {
            my @inactive;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                if ( $ad{$gid}{'idents'}{$ident}{'statistics'}
                    and not defined $ad{$gid}{'idents'}{$ident}{'active'} )
                {
                    push( @inactive, $ident );
                }
            }
            if ( $#inactive != -1 ) {
                $ret .= $gstr
                  . group_status_helper( "not in a call to the collectives",
                    0, $ad{$gid}{size}, @inactive );
                $gstr = "";
            }
        }
    }

    my $count = keys(%ad);

    if ( $count == 1 ) {
        my $use_str = ( $i_count == 1 ) ? "" : " not";
        $ret .= "Total: $count group which is$use_str in use.\n";
    } else {
        my $d_str = ( $d_count == 1 ) ? "is" : "are";
        my $i_str = ( $i_count == 1 ) ? "is" : "are";
        $ret .=
"Total: $count groups of which $d_count $d_str destroyed and $i_count $i_str in use.\n";
    }

    if ($missing_self) {
        $ret .= "\n(1) Groups that have no statistics are reported by vp\n";
        $ret .= "rather than group id\n";
    }

    return "$ret";
}

###############################################################################
#
# Local (per node) stats.
#
###############################################################################

sub local_stats_from_job {
    my $job = shift;

    printf("Showing local job $job\n");

    my $key = rms_job_to_key($job);

    if ( not defined $key ) {
        printf("Cannot find key for local job $job\n");
        return;
    }

    my @data;
    open( PCMD, "edb -k $key --stats-raw 2>/dev/null|" )
      or confess "$prog: cant open file: $!\n";
    local $/ = "\n\n";
    while (<PCMD>) {
        s/\n//g;
        push @data, $_;
    }

    my $s = read_stats(@data);

    # $stats_total = 1;

    show_stats($s);
}

# Show stats for all jobs on this node.
sub local_stats {
    opendir( DH, "/proc/rms/programs" );
    my @files = readdir(DH);
    closedir(DH);

    foreach my $job (@files) {
        next if ( $job eq ".." );
        next if ( $job eq "." );

        local_stats_from_job($job);

    }
}

###############################################################################
#
# Stack trace tree compression.
#
###############################################################################

#
# Compare two lists-o-strings
#	\@l1 (IN)	list1
#	\@l2 (IN)	list2
#	RETURN		1 if match, 0 if not
#
sub cmp_list {
    my ( $l1, $l2 ) = @_;

    if ( $#{$l1} != $#{$l2} ) {
        return 0;
    }

    for ( my $i = 0 ; $i <= $#{$l1} ; $i++ ) {
        if ( !defined( ${$l2}[$i] ) || ${$l1}[$i] ne ${$l2}[$i] ) {
            return 0;
        }
    }

    return 1;
}

# This function returns an reference to an array of hashes, each
# hash containing the "txt" of the function name and a further array
# of hash references called "children".
sub go_p {
    my ( $level, $lines, @tags ) = @_;

    my @peers;
    my $prev;
    my $tag = $tags[0];

    debug_log( "tree", \@tags, "called tag:%s, level:%d", $tag, $level );

    return if ( !defined($tag) );
    return if ( !defined( $lines->{$tag} ) );

    my @identical = ();
    my @different = ();

    my $endlevel = $level;

    # Populate the two lists, @identical and @different
    my $line = $lines->{$tag}[$level];
    if ( defined $line ) {
        foreach my $tag2 (@tags) {
            next if ( $tag2 eq $tag );
            if ( defined( $lines->{$tag2}[$level] )
                and $line eq $lines->{$tag2}[$level] )
            {
                push( @identical, $tag2 );
                delete( $lines->{$tag2}[$level] );
            } else {
                push( @different, $tag2 );
            }
        }
    } else {
        foreach my $dtag (@tags) {
            if ( $dtag != $tag ) {
                push( @different, $dtag );
            }
        }

    }

    # Move $endlevel on as far as we can...
    if ( $#identical >= 0 ) {
        my $nextIdentical;
        do {
            $nextIdentical = 0;
            my $nextFound = 0;
            $endlevel++;
            if ( defined $lines->{$tag}[$endlevel] ) {
                foreach my $tag2 (@identical) {
                    if ( defined( $lines->{$tag2}[$endlevel] )
                        and $lines->{$tag}[$endlevel] eq
                        $lines->{$tag2}[$endlevel] )
                    {
                        $nextFound++;
                    }
                }
            }
            if ( ( $#identical + 1 ) == $nextFound ) {
                $nextIdentical = 1;
            }
        } while $nextIdentical;
        $endlevel--;
    } else {
        $endlevel = ( $#{ $lines->{$tag} } );
    }

    debug_log(
        "tree", undef,

"level $level, endlevel $endlevel, identical:@identical different:@different"
    );

    for ( my $l = $level ; $l <= $endlevel ; $l++ ) {

        my %this;
        $this{txt} = $lines->{$tag}[$l];
        @{ $this{vps} } = ( $tag, @identical );
        $this{vpspec} = compress( @identical, $tag );

        if ( defined $prev ) {
            push @{ $prev->{children} }, \%this;
        } else {
            push @peers, \%this;
        }

        $prev = \%this;

    }

    if ( $#identical >= 0 ) {

        if ( $endlevel != $#{ $lines->{$tag} } + 1 ) {
            unshift @identical, $tag;
        }

        $prev->{children} = go_p( $endlevel + 1, $lines, @identical );
    }

    debug_log(
        "tree", undef,

"returning level:$level endlevel:$endlevel identical:@identical different:@different"
    );

    if (@different) {
        my $new = go_p( $level, $lines, @different );
        foreach my $n ( @{$new} ) {
            push @peers, $n;
        }
    }

    return \@peers;
}

# Takes a ref to a array of hashes...
sub _show_tree {

    my ( $ref, $parent, $indent ) = @_;

    my $ret = "";
    my @peers = sort ( { $a->{vps}[0] <=> $b->{vps}[0] } ( @{$ref} ) );

    foreach my $peer (@peers) {

        if ( $#peers != 0 or not defined $parent or $parent ne $peer->{vpspec} )
        {
            my $count   = $#{ $peer->{vps} } + 1;
            my $i_level = "$peer->{vpspec} ($count processes)";
            $ret .= "$indent-----------------\n";
            $ret .= "$indent$i_level\n";
            $ret .= "$indent-----------------\n";
        }

        $ret .= "$indent$peer->{txt}\n";
        if ( defined $peer->{children} ) {
            $ret .=
              _show_tree( $peer->{children}, $peer->{vpspec}, "$indent  " );
        }
    }
    return $ret;
}

sub show_tree {
    my $ref = shift;
    debug_log( "tree", $ref, "Complete tree" );
    return _show_tree( $ref, undef, "" );
}

# This function is used to process the line tags, it changes fab0 fab1 fab10 into
# fab[0-1,10].
sub compress {
    my %rng  = comp(@_);
    my @list = ();

    # comp returns a hash of arrays, the hash keys are the machines names
    # eg "fab" or "fabi", the arrays have zero or more elements in each one
    # specifies a node-spec.  If there is only one element in the array and
    # it doesn't contain a "-" then don't put square braces around the list
    # contents

    local $" = ",";    # "

    @list = map {
        $_
          . (
            @{ $rng{$_} } > 1 || ${ $rng{$_} }[0] =~ /-/
            ? "[@{$rng{$_}}]"
            : "@{$rng{$_}}"
          )
    } sort keys %rng;

    return wantarray ? @list : "@list";
}

# sortn:
#
# # sort a group of alphanumeric strings by the last group of numerals in
# that string
#
sub sortn {
    map { $$_[0] }
      sort { ( $$a[1] || 0 ) <=> ( $$b[1] || 0 ) } map { [ $_, /(\d*)$/ ] } @_;
}

sub comp {
    my (%i) = ();
    my (%s) = ();

    # turn off warnings here to avoid perl complaints about
    # uninitialized values for members of %i and %s
    local ($^W) = 0;
    push(
        @{
            $s{ $$_[0] }[
              (
                  $s{ $$_[0] }[ $i{ $$_[0] } ]
                    [ $#{ $s{ $$_[0] }[ $i{ $$_[0] } ] } ] == ( $$_[1] - 1 )
              ) ? $i{ $$_[0] } : ++$i{ $$_[0] }
            ]
          },
        ( $$_[1] )
    ) for map { [/(.*?)(\d*)$/] } sortn(@_);

    for my $key ( keys %s ) {
        @{ $s{$key} } =
          map { $#$_ > 0 ? "$$_[0]-$$_[$#$_]" : @{$_} } @{ $s{$key} };
    }

    return %s;
}

###############################################################################
#
# RMS support.
#
###############################################################################

sub find_exe {
    my $name = shift;
    foreach my $dir ( split( ":", $ENV{PATH} ) ) {
        return 1 if ( -x "$dir/$name" );
    }
    return 0;
}

sub rms_is_installed {
    return ( find_exe("prun") and find_exe("rmsquery") );
}

sub rms_get_jobs {
    my $user = shift;
    my @res =
`rmsquery "select jobs.name from jobs,resources where jobs.status=\'running\' and jobs.resource = resources.name and resources.username=\'$user\'"`;
    chomp @res;
    return @res;
}

sub rms_job_is_running {
    my $job    = shift;
    my $status = `rmsquery "select status from jobs where name=\'$job\'"`;
    chomp $status;
    return ( $status eq "running" );
}

sub rms_job_to_key {
    my $job = shift;
    return ( $job << 9 ) - 1;
}

sub rms_setup_pcmd {
    my $job = shift;

    my $res = rms_job_to_resource($job);

    my $ncpus = rms_job_to_ncpus($job);

    my $nhosts = rms_job_to_nhosts($job);

    if ( $res eq "" ) {
        printf("Job '$job' doesn't have a associated resource\n");
        return undef;
    }

    # Try to prevent zombie jobs, fairly rare but I have seen
    # nodes run different versions of edb which can cause problems
    # XXX: Fixme.  This isn't high enough.
    if ( $conf{prun_exittimeout} != 0 ) {
        $ENV{RMS_EXITTIMEOUT} = $conf{prun_exittimeout};
    }

    if ( $conf{prun_timeout} != 0 ) {
        $ENV{RMS_TIMELIMIT} = $conf{prun_timeout};
    }

    {

        # Work around a couple of bugs in RMS
        # the first one is really old and was there
        # for a while, the second one is limited
        # to 'qsrmslibs-2.82-15'
        my $partition = rms_res_to_partition($res);
        $ENV{RMS_PARTITION}  = $partition;
        $ENV{RMS_RESOURCEID} = "$partition.$res";
    }

    my $cmd = "prun -i /dev/null -T $res";

    return ( $cmd, $ncpus, $nhosts );
}

# Not exported...
sub rms_job_to_resource {
    my $job = shift;
    my $res = `rmsquery "select resource from jobs where name=\'$job\'"`;
    chomp $res;
    return $res;
}

sub rms_job_to_ncpus {
    my $job   = shift;
    my $cpus  = `rmsquery "select cpus from jobs where name=\'$job\'"`;
    my $nodes = `rmsquery "select nodes from jobs where name=\'$job\'"`;

    chomp $cpus;
    chomp $nodes;

    my $ncpus = 0;

    my @c = map { $_ =~ /(\d+)-(\d+)/ ? $2 - $1 + 1 : 1 } ( split " ", $cpus );

    my @n = map { $_ =~ /(\d+)-(\d+)/ ? $2 - $1 + 1 : 1 } ( split " ", $nodes );

    for ( my $idx = 0 ; $idx <= $#n ; $idx++ ) {
        $ncpus += $n[$idx] * $c[$idx];
    }

    printf("extracted $ncpus from $cpus and $nodes\n") if $conf{verbose} > 1;

    return $ncpus;
}

sub rms_job_to_nhosts {
    my $job      = shift;
    my $nodeSpec = `rmsquery "select hostnames from jobs where name=\'$job\'"`;

    chomp $nodeSpec;
    my $i;
    my @nodeList;
    my $prefix;
    my $suffix;

    # deal with multiple entries
    foreach ( split( " ", $nodeSpec ) ) {
        if (m/([^\[]+)\[([0-9-,]+)\]([^\[]*)/) {
            $prefix = $1;
            $suffix = $3;

            foreach ( split( ",", $2 ) ) {
                if ( !m/([0-9]+)-?([0-9]+)?/ ) {
                    print "malformed nodespec '$_'\n";
                    exit(1);
                }

                if ( defined($2) ) {

                    # square braces with range, eg 		    'machine[0-3]'
                    for ( $i = $1 ; $i <= $2 ; $i++ ) {
                        push( @nodeList, $prefix . $i . $suffix );
                    }
                } else {

                    # no range, just suffix
                    push( @nodeList, $prefix . $1 . $suffix );
                }
            }
        } else {

            # no square braces, just node name, eg 'machine0'
            if ( !m/([^\[]+)([0-9]+)([^\[]*)/ ) {
                print "malformed nodespec '$_'\n";
                exit(1);
            }

            push( @nodeList, $1 . $2 . $3 );
        }
    }

    return $#nodeList + 1;
}

sub rms_res_to_partition {
    my $res  = shift;
    my $part = `rmsquery "select partition from resources where name=\'$res\'"`;
    chomp $part;
    return $part;
}

###############################################################################
#
# Slurm support.
#
###############################################################################

sub slurm_is_installed {
    return ( find_exe("srun") and find_exe("squeue") and find_exe("scontrol") );
}

sub slurm_get_jobs {
    my $user = shift;
    my @res  = `squeue -t running -u $user -h -o "%i" 2>/dev/null`;
    chomp @res;
    return @res;
}

# Query the process count for the "step" as that's how many
# processes we are going to be looking for.
sub slurm_job_to_ncpus {
    my $job   = shift;
    my $s     = "$job." . $conf{slurm_job_step};
    my @steps = `squeue -s $s -o "%i %A" 2>/dev/null`;
    return undef if ( $? != 0 );

    # The %A option is new so ensure we have the TASKS output
    # before we believe what we see here...
    # Mind you %A is several years old now so if it's not there
    # we probably can't do anything anyway.
    my $tasks;
    my $have_tasks = 0;
    foreach my $step (@steps) {
        my ( $step, $cpus ) = split( " ", $step );
        $tasks      = $cpus if ( $step eq $s );
        $have_tasks = 1     if ( $cpus eq "TASKS" );
    }
    return $tasks if $have_tasks;
    return undef;
}

# Query the nodecount for the "job" as that is what we shall be running on.
sub slurm_job_to_nodecount {
    my $job  = shift;
    my @jobs = `squeue -o "%i %D" 2>/dev/null`;
    return undef if ( $? != 0 );

    foreach my $step (@jobs) {
        my ( $left, $right ) = split( " ", $step );
        return $right if ( $left eq $job );
    }
    return undef;
}

# Query the node list for the "step" which isn't the same as the node list
# for the job, care should be taken if using this function to ensure this
# is correct.
# This functions isn't used currently.
sub slurm_job_to_nodelist {
    my $job   = shift;
    my $s     = "$job." . $conf{slurm_job_step};
    my @steps = `squeue -s $s -o "%i %N" 2>/dev/null`;
    return undef if ( $? != 0 );

    foreach my $step (@steps) {
        my ( $left, $right ) = split( " ", $step );
        return $right if ( $left eq $s );

    }
    return undef;
}

sub slurm_job_is_running {
    my $job    = shift;
    my $status = lc `squeue -h -j $job -o "%T"`;
    chomp $status;
    return ( $status eq "running" );
}

sub slurm_setup_pcmd {
    my $job  = shift;
    my $cpus = slurm_job_to_ncpus($job);
    my $nc   = slurm_job_to_nodecount($job);
    return ( "srun --jobid=$job", $cpus, $nc );
}

###############################################################################
#
# Local support.
#
###############################################################################

sub local_get_jobs {
    my $user = shift;
    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);
    my @jobs;
    my $tuid = getpwnam($user);
    return unless defined $tuid;

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );

        my (
            $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
            $size, $atime, $mtime, $ctime, $blksize, $blocks
        ) = stat("/proc/$pid");

        next unless ( $uid eq $tuid );

        push @jobs, $pid;
    }

    return @jobs;
}

sub local_fd_get_jobs_real {
    my $user = shift;
    my $file = shift;
    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);
    my @jobs;
    my $tuid = getpwnam($user);
    return unless defined $tuid;

    foreach my $pid (@pids) {
        next unless ( $pid =~ /^\d+$/ );

        my (
            $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
            $size, $atime, $mtime, $ctime, $blksize, $blocks
        ) = stat("/proc/$pid");

        next unless ( $uid eq $tuid );

        opendir( DIR, "/proc/$pid/fd" );
        my @fds = readdir(DIR);
        closedir(DIR);
        foreach my $fd (@fds) {
            my $target = readlink("/proc/$pid/fd/$fd");
            next unless $target;
            if ( $target eq $file ) {
                push @jobs, $pid;
                last;
            }
        }
    }

    return @jobs;
}

sub local_fd_get_jobs {
    my $user = shift;
    return local_fd_get_jobs_real( $user, $conf{local_fd_name} );
}

sub local_q_is_installed {
    return ( -d "/proc/qsnet" );
}

sub local_q_get_jobs {
    my $user = shift;
    return local_fd_get_jobs_real( $user, "/proc/qsnet/elan/user" );
}

sub local_job_is_running {
    my $job = shift;
    return ( -d "/proc/$job" );
}

sub local_setup_pcmd {
    return ( "", 1, 1 );
}

###############################################################################
#
# mpd support.
#
###############################################################################

sub mpd_is_installed {
    return ( find_exe("mpdlistjobs") and find_exe("mpdrun") );
}

sub mpd_get_data {
    open( MPD, "mpdlistjobs|" ) or return;
    my @out = <MPD>;
    close MPD;
    my %jobs;
    my $job;
    my $host;
    my $pid;
    foreach my $l (@out) {
        my ( $key, $value ) = split( "= ", $l );
        next unless $value;
        $key =~ s/ //g;
        chomp $value;
        if ( $key eq "jobid" ) {
            my ( $j, $host ) = split( "@", $value );
            $job = $j;
        }
        if ( $key eq "username" ) {
            $jobs{$job}{user} = $value;
        }
        if ( $key eq "host" ) {
            $host = $value;
            $jobs{$job}{host}{$value}++;
        }
        if ( $key eq "pid" ) {
            $pid = $value;
            $jobs{$job}{pids}{$host}{$value}++;
        }
        if ( $key eq "rank" ) {
            $jobs{$job}{pids}{$host}{$pid} = $value;
            if (   ( not defined $jobs{$job}{lastproc} )
                or ( $value > $jobs{$job}{lastproc} ) )
            {
                $jobs{$job}{lastproc} = $value;
            }
        }
    }
    return \%jobs;
}

# There is a bug here I think, $user isn't used anywhere
# which is probably bad.
sub mpd_get_jobs {
    my $user = shift;

    my $d    = mpd_get_data();
    my @jobs = keys %{$d};
    return @jobs;
}

my $mpd_dfile;

sub mpd_setup_pcmd {
    my $job = shift;

    my $d = mpd_get_data();

    my @hosts = keys %{ $d->{$job}{host} };
    my $i     = @hosts;

    my ( $fh, $fn ) = tempfile("/tmp/padb.XXXXXXXX");
    foreach my $host (@hosts) {
        print $fh "$host:1\n";
    }
    close $fh;

    $mpd_dfile = $fn;

    my $cmd = "mpdrun -machinefile $fn -np $i";

    my $hosts = $#hosts + 1;

    return ( $cmd, $d->{$job}{lastproc} + 1, $hosts );
}

sub mpd_cleanup_pcmd {
    unlink($mpd_dfile) if ( defined($mpd_dfile) );
}

###############################################################################
#
# open support.
#
###############################################################################

sub find_ompi_prefix {
    my $name = "ompi-ps";
    foreach my $dir ( split( ":", $ENV{PATH} ) ) {
        next unless ( -x "$dir/$name" );
        my @d = split( "/", $dir );
        pop @d;
        my $prefix = join( "/", @d );
        return "--prefix $prefix";
    }
    return "";
}

sub open_is_installed {
    return ( find_exe("ompi-ps") and find_exe("orterun") );
}

my %open_jobs;

sub open_get_data {

    # Simply return if called more than once.
    if ( keys(%open_jobs) != 0 ) {
        return;

    }

    my $job;

    open( OPEN, "ompi-ps|" ) or return;
    my @out = <OPEN>;
    close OPEN;

    # Handle being called multiple times, zero the hash every
    # time we are called.  Of course we could just return the
    # existing hash which might be quicker.
    %open_jobs = ();

    foreach my $l (@out) {
        chomp $l;
        next if ( $l eq "" );

        if ( $l =~ /Information from mpirun \[(\d+)\,0\]/ ) {

            $job = $1;
        } else {
            my @elems = split( /\|/, $l );

            if ( $#elems == 4 ) {
                my $nprocs = $elems[3];
                $nprocs =~ s/ //g;
                $open_jobs{$job}{nprocs} = $nprocs;
            } elsif ( $#elems == 6 ) {

                my $host = $elems[4];
                $host =~ s/ //g;
                $host =~ s/\t//g;
                next if $host eq "Node";
                $open_jobs{$job}{hosts}{$host}++;

                my $name = $elems[1];
                $name =~ /\[\[(\d+)\,(\d+)\]\,(\d+)\]/;
                my $rank = $3;

                my $pid = $elems[3];
                $rank =~ s/ //g;
                $pid  =~ s/ //g;
                $open_jobs{$job}{ranks}{$host}{$rank} = $pid;
            }
        }

    }
}

sub open_get_jobs {
    my $user = shift;

    open_get_data();
    return keys %open_jobs;
}

my $open_dfile;

sub open_setup_pcmd {
    my $job = shift;

    open_get_data();

    my @hosts = keys %{ $open_jobs{$job}{hosts} };
    my $i     = @hosts;

    my ( $fh, $fn ) = tempfile("/tmp/padb.XXXXXXXX");

    foreach my $host (@hosts) {
        print $fh "$host\n";
    }
    close $fh;

    $open_dfile = $fn;

    my $prefix = find_ompi_prefix();
    my $cmd    = "orterun -machinefile $fn -np $i $prefix";
    my $hosts  = $#hosts + 1;

    return ( $cmd, $open_jobs{$job}{nprocs}, $hosts );
}

sub open_cleanup_pcmd {
    unlink($open_dfile) if ( defined($open_dfile) );
}

###############################################################################
#
# lsf support.
#
###############################################################################

sub lsf_is_installed {

    # Check for both LSF and RMS, I know LSF works in other ways but I don't
    # know how to launch jobs then...
    return ( find_exe("bjobs") and rms_is_installed() );
}

sub lsf_get_jobs {
    my $user = shift;

    my @jobs;

    open( LSF, "bjobs -r -u $user 2>/dev/null|" ) or return;
    my @out = <LSF>;
    close LSF;
    foreach my $l (@out) {
        my ( $job, $user, $stat, $queue, $from, $exec, $name, $time ) =
          split( " ", $l );
        next if ( $job eq "JOBID" );
        next unless ( defined $time );
        push @jobs, $job;
    }

    return @jobs;
}

# This is a little odd, lsf allocates a resource and then pruns (-n1) the users script
# inside that resource.  That script then calls prun which is the real parallel job,
# In essence then you get one resource and (at least) two jobs, padb needs to target
# the second one.  This is controlled by the -Olsf_job_offset option, the default being
# one.
sub lsf_setup_pcmd {
    my $job = shift;

    my $machine = `rinfo -m`;
    chomp $machine;
    my $query =
      "select name,ncpus from resources where batchid=\'$machine\@$job\'";
    my $result = `rmsquery "$query"`;

    my ( $res, $ncpus ) = split( " ", $result );

    open( QUERY,
"rmsquery \"select name from jobs where jobs.resource=\'$res\' and status = \'running\' order by name\"|"
    );
    my @out = <QUERY>;
    close QUERY;

    my $rjob;

    my $idx = $conf{lsf_job_offset};
    $idx = 1 if ( $idx > $#out );
    $rjob = $out[$idx];
    chomp $rjob;
    $rem_jobid = $rjob;

    my $cmd = "prun -i /dev/null -T $res";

    return ( $cmd, $ncpus );
}

###############################################################################
#
# Resource manager support.
#
###############################################################################

sub setup_rmgr {
    $conf{rmgr} = shift;

    # Now setup the variable for the rest of the program.
    if ( defined $rmgr{ $conf{rmgr} }{inner_rmgr} ) {
        $cinner{rmgr} = $rmgr{ $conf{rmgr} }{inner_rmgr};
    } else {
        $cinner{rmgr} = $conf{rmgr};
    }
}

sub find_rmgr {

# If it's been set on the command line and it's valid then just use what we are given.
# Do sanity checks here but only warn on the result to cope with non-default installs.

    if ( defined $conf{rmgr} ) {
        if ( not defined $rmgr{ $conf{rmgr} } ) {
            printf("Error, resource manager \"$conf{rmgr}\" not supported\n");
            exit(1);
        }

        if ( defined $rmgr{ $conf{rmgr} }{is_installed}
            and not $rmgr{ $conf{rmgr} }{is_installed}() )
        {
            printf(
"Warning: Selected resource manager $conf{rmgr} does not appear to be installed\n"
            );
        }
        setup_rmgr( $conf{rmgr} );
        return;
    }

    my @ok;
    foreach my $res ( sort( keys %rmgr ) ) {
        next unless defined $rmgr{$res}{is_installed};
        if ( $rmgr{$res}{is_installed}() ) {
            push @ok, $res;
        }
    }
    if ( $#ok != 0 ) {
        printf(
"Error, multiple resource managers detected, use -Ormgr=<resource manager>\n"
        );
        push @ok, "local-fd";
        push @ok, "local";
        printf("@ok\n");
        exit(1);
    }

    setup_rmgr( $ok[0] );
}

# Find any active resource manager, that is --any or --all
# have been passed on the command line so look for any resource
# manager that have active jobs, if there is one active resource
# manager use that one, if there are zero or many exit with an
# error.
sub find_any_rmgr {

# If it's been set on the command line and it's valid then just use what we are given.
# Do sanity checks here but only warn on the result to cope with non-default installs.

    if ( defined $conf{rmgr} ) {
        if ( not defined $rmgr{ $conf{rmgr} } ) {
            printf("Error, resource manager \"$conf{rmgr}\" not supported\n");
            exit(1);
        }

        if ( defined $rmgr{ $conf{rmgr} }{is_installed}
            and not $rmgr{ $conf{rmgr} }{is_installed}() )
        {
            printf(
"Warning: Selected resource manager $conf{rmgr} does not appear to be installed\n"
            );
        }
        setup_rmgr( $conf{rmgr} );
        return;
    }

    my @installed;
    foreach my $res ( sort( keys %rmgr ) ) {
        next unless defined $rmgr{$res}{is_installed};
        if ( $rmgr{$res}{is_installed}() ) {
            push @installed, $res;

        }
    }

    # One resource manager is installed, good.
    if ( $#installed == 0 ) {
        setup_rmgr( $installed[0] );
        return;
    }

    # No resource managers are installed, bad.
    if ( $#installed == -1 ) {
        printf(
"Error, multiple resource managers detected, use -Ormgr=<resource manager>\n"
        );
        push @installed, "local-fd";
        push @installed, "local";
        printf("@installed\n");
        exit(1);
    }

    my @active;
    foreach my $res (@installed) {
        my @jobs = $rmgr{$res}{get_active_jobs}($user);
        if ( $#jobs != -1 ) {
            push @active, $res;
        }
    }

    # Only one resource manager has active jobs, let's use it.
    if ( $#active == 0 ) {
        setup_rmgr( $active[0] );
        return;
    }

    # Multiple resource managers are installed and have jobs,
    # bouce back to the user to specify which one they want.
    printf(
"Error, multiple active resource managers detected, use -Ormgr=<resource manager>\n"
    );
    push @installed, "local-fd";
    push @installed, "local";
    printf("@installed\n");
    exit(1);
}

sub get_all_jobids {
    my $user = shift;
    debug_log( "rmgr", undef, "Loading active jobs list" );
    return $rmgr{ $conf{rmgr} }{get_active_jobs}($user);
}

sub job_is_running {
    my $job = shift;

    if ( defined $rmgr{ $conf{rmgr} }{job_is_running} ) {
        return $rmgr{ $conf{rmgr} }{job_is_running}($job);
    }

    my @jobs = $rmgr{ $conf{rmgr} }{get_active_jobs}($user);
    my %j;
    map { $j{$_}++ } @jobs;
    return defined $j{$job};
}

sub job_to_key {
    my $job = shift;

    if ( defined $rmgr{ $conf{rmgr} }{job_to_key} ) {
        return $rmgr{ $conf{rmgr} }{job_to_key}($job);
    }

    return undef;
}

sub setup_pcmd {
    my $job = shift;
    return $rmgr{ $conf{rmgr} }{setup_pcmd}($job);
}

sub cleanup_pcmd {
    my $job = shift;
    if ( defined( $rmgr{ $conf{rmgr} }{cleanup_pcmd} ) ) {
        $rmgr{ $conf{rmgr} }{cleanup_pcmd}();
    }
}

###############################################################################
#
# Output formatting
#
###############################################################################

sub strip_stack_traces {
    my ( $cargs, $lines ) = @_;

    my %above;
    my %below;

    map { $above{$_}++ }
      split( ",", $conf{mode_options}{stack}{stack_strip_above} );
    map { $below{$_}++ }
      split( ",", $conf{mode_options}{stack}{stack_strip_below} );

    foreach my $tag ( keys %$lines ) {

        # There was a subtle bug here, functions from the @above_list
        # often appear below main which this code doesn't handle all that
        # well.
        my $main_idx;
        my $wait_idx = 0;
        for ( my $l = 0 ; $l < $#{ $lines->{$tag} } ; $l++ ) {
            if ( $lines->{$tag}->[$l] =~ /(\w*)\(/ ) {
                if ( defined $below{$1} ) {
                    $main_idx = $l;
                }
                if ( defined $above{$1} ) {
                    if ( defined($main_idx) ) {
                        $wait_idx = $l;
                        last;
                    }
                }
            }
        }
        $main_idx = 0 if not defined $main_idx;
        if ( $main_idx != 0 or $wait_idx != 0 ) {
            my $end =
              ( $cargs->{strip_above_wait} and $wait_idx )
              ? $wait_idx
              : $#{ $lines->{$tag} };
            my $start =
              ( $cargs->{strip_below_main} and $main_idx ) ? $main_idx : 0;

            printf( "Stripping 0.."
                  . $#{ $lines->{$tag} }
                  . " to $start..$end for $tag\n" )
              if $conf{verbose} > 1;

            my @new = @{ $lines->{$tag} };
            @new = @new[ $start .. $end ];
            $lines->{$tag} = \@new;
        }
    }
}

sub sort_proc_hashes {
    my $carg = shift;
    my $key  = shift;
    my @all  = @_;

    if ( $carg->{reverse_sort_order} ) {
        return ( reverse( sort { $a->{$key} <=> $b->{$key} } @all ) );
    } else {
        return ( sort { $a->{$key} <=> $b->{$key} } @all );
    }
}

sub pre_mpi_watch {
    my ($nprocs) = @_;
    my $header = <<EOF;
u: unexpected messages U: unexpected and other messages
s: sending messages r: receiving messages m: sending and receiving
b: Barrier B: Broadcast g: Gather G: AllGather r: reduce: R: AllReduce
a: alltoall A: alltoalls w: waiting
.: consuming CPU cycles ,: using CPU but no queue data -: sleeping *: error
EOF
    print($header);
    my $l = "0";
    for ( my $i = 1 ; $i < $nprocs ; $i++ ) {
        if ( $i % 10 == 0 ) {
            $l .= substr( $i, 0, 1 );
        } elsif ( $i % 5 == 0 ) {
            $l .= "5";
        } else {
            $l .= ".";
        }
    }
    printf("$l\n");
}

# Convert back from a set of values (with ranges) in a namespace to a array of
# ranks containing the values.  Assume that each rank only appears in the
# namespace with one value.
sub array_from_target_namespace {
    my ($r) = @_;

    my @all;
    foreach my $value ( sort( keys( %{$r} ) ) ) {
        while ( defined( my $rank = rng_shift( $r->{$value} ) ) ) {
            $all[$rank] = $value;
        }
    }
    return @all;
}

sub tree_from_namespace {
    my ($r) = @_;

    my %res;

    foreach my $namespace ( keys( %{$r} ) ) {
        foreach my $value ( keys( %{ $r->{$namespace} } ) ) {
            while (
                defined( my $rank = rng_shift( $r->{$namespace}{$value} ) ) )
            {
                $res{$rank}{$namespace} = $value;
            }
        }
    }

    return \%res;
}

sub show_mpi_watch {
    my ( $handle, $lines ) = @_;

    my @all = array_from_target_namespace( $lines->{target_data}{state} );

    my $o = "";
    while ( defined( my $v = shift(@all) ) ) {
        $o .= $v;
    }
    print("$o\n");
}

# Nicely format process information.
# XXX: proc-sort-key should probably sort on column headers as
# well as keys.
# Idealy we'd know what format we wanted and only ask the nodes
# to report relevent info, for now they still report everything.
sub show_proc_format {
    my ( $carg, $nlines ) = @_;

    my @proc_format_array;
    my %proc_format_header;
    my $show_fields = 0;

    my %proc_format_lengths;
    my %proc_header_reverse;

    my $separator = $carg->{column_seperator};

    my @columns = split( ",", $carg->{proc_format} );
    foreach my $column (@columns) {

        $show_fields = 1 if ( $column eq "fields" );

        my ( $name, $desc ) = split( "=", $column );
        if ( defined $desc ) {
            push @proc_format_array, lc($name);
            $proc_format_header{ lc($name) }  = $desc;
            $proc_format_lengths{ lc($name) } = length($desc);
            $proc_header_reverse{ lc($desc) } = lc($name);
        } else {
            push @proc_format_array, lc($column);
            $proc_format_header{ lc($column) }  = $column;
            $proc_format_lengths{ lc($column) } = length($column);
        }
    }

    my @all;

    my $lines = tree_from_namespace( $nlines->{target_data} );
    foreach my $tag ( sort ( keys %$lines ) ) {
        my %hash;
        $hash{vp} = $tag;
        foreach my $key ( keys( %{ $lines->{$tag} } ) ) {

            my $value = $lines->{$tag}{$key};
            next unless defined $proc_format_lengths{$key} or $show_fields;

            if ( length($value) > $proc_format_lengths{$key} ) {
                $proc_format_lengths{$key} = length($value);
            }

            $hash{$key} = $value;

        }
        if ($show_fields) {
            my @fields = sort ( keys(%hash) );
            print "@fields\n";
            exit(0);
        }
        push @all, \%hash;
    }

    # Allow sort keys to be based on column names as well as real keys.
    my $key = lc( $carg->{proc_sort_key} );
    if ( defined $proc_header_reverse{$key} ) {
        $key = $proc_header_reverse{$key};
    }
    @all = sort_proc_hashes( $carg, $key, @all );

    if ( $carg->{proc_show_header} ) {
        my @res;
        foreach my $key (@proc_format_array) {
            my $l .= sprintf( "%-$proc_format_lengths{$key}s",
                $proc_format_header{$key} );
            push @res, $l;
        }
        my $line = join( $separator, @res );
        print "$line\n";
    }
    my $count = $carg->{nprocs_output};
    foreach my $hash (@all) {
        my @res;
        foreach my $key (@proc_format_array) {
            my $value = "??";
            if ( defined $hash->{$key} ) {
                $value = $hash->{$key};
            }
            push @res, sprintf( "%$proc_format_lengths{$key}s", $value );
        }
        my $line = join( $separator, @res );
        print "$line\n";
        if ( defined($count) and ( --$count == 0 ) ) {
            return;
        }
    }
}

# XXX: Now only called when loading things from file.
sub show_results {
    my ( $nlines, $mode, $handle ) = @_;

    my $lines = $nlines->{lines};

    if ( defined $allfns{$mode}{out_handler} ) {
        $allfns{$mode}{out_handler}( undef, $nlines );
        return;
    }

    #if ( $mode eq "stack" or $input_file ) {
    #    if ( $strip_below_main or $strip_above_wait ) {
    #        strip_stack_traces(undef,$lines);
    #    }
    #}

    if ($tree) {
        print show_tree go_p( 0, $lines,
            ( sort { $a <=> $b } ( keys %$lines ) ) );
    } elsif ($compress) {
        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            next if ( !defined( $lines->{$tag} ) );
            my @identical = ();
            foreach my $tag2 ( keys %$lines ) {
                next if ( $tag2 eq $tag );
                if ( cmp_list( \@{ $lines->{$tag} }, \@{ $lines->{$tag2} } ) ) {
                    push( @identical, $tag2 );
                    delete( $lines->{$tag2} );
                }
            }
            print("----------------\n");
            printf( "%s\n", join( ",", compress( @identical, $tag ) ) );
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    } elsif ($compress_C) {
        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            print("----------------\n");
            print("$tag\n");
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    }
}

###############################################################################
#
# Data collection (parallel and from file).
#
###############################################################################

sub process_line {
    my ( $line, $lines ) = @_;

    if ( $line =~ / *([a-zA-Z]*)\.?([-\d]+):([^\n]+)\n/ ) {
        if ( not $1 ) {
            my $key   = $2;
            my $value = $3;
            if ( $value =~ /raw\:([A-Za-z0-9\+\/\=]*)/ ) {
                push( @{ $lines->{base64}{$key} }, $1 );
            } else {
                push( @{ $lines->{lines}{$key} }, $value );
            }
        } else {
            printf("debug $1.$2: $3\n");
        }
    } else {
        printf("malformed line: $line");
    }
}

sub post_process_lines {
    my $lines = shift;
    return unless exists( $lines->{base64} );
    foreach my $tag ( keys %{ $lines->{base64} } ) {
        $lines->{raw}{$tag} =
          thaw( decode_base64( join( "\n", @{ $lines->{base64}{$tag} } ) ) );
    }
}

sub default_output_handler {
    my ( $req, $d ) = @_;

    my $cargs = $req->{cargs};

    # Warn on missing output here...
    return unless exists $d->{target_output};

    my $lines  = $d->{target_output};
    my $mode   = $req->{mode};
    my $output = "raw";

    $output = $req->{out_format} if defined $req->{out_format};

    if ( $mode eq "stack" or $input_file ) {
        if ( $cargs->{strip_below_main} or $cargs->{strip_above_wait} ) {
            strip_stack_traces( $cargs, $lines );
        }
    }

    if ( $output eq "tree" ) {
        print show_tree go_p( 0, $lines,
            ( sort { $a <=> $b } ( keys %$lines ) ) );

    } elsif ( $output eq "compress" ) {

        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            next if ( !defined( $lines->{$tag} ) );
            my @identical = ();
            foreach my $tag2 ( keys %$lines ) {
                next if ( $tag2 eq $tag );
                if ( cmp_list( \@{ $lines->{$tag} }, \@{ $lines->{$tag2} } ) ) {
                    push( @identical, $tag2 );
                    delete( $lines->{$tag2} );
                }
            }
            print("----------------\n");
            printf( "%s\n", join( ",", compress( @identical, $tag ) ) );
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    } elsif ( $output eq "compress_c" ) {
        foreach my $tag ( sort { $a <=> $b } ( keys %$lines ) ) {
            print("----------------\n");
            print("$tag\n");
            print("----------------\n");
            foreach my $data ( @{ $lines->{$tag} } ) {
                print("$data\n");
            }
        }
    } else {
        my $nprocesses = keys( %{ $d->{target_output} } );
        foreach my $process ( sortn( keys( %{ $d->{target_output} } ) ) ) {
            foreach my $line ( @{ $d->{target_output}{$process} } ) {
                if ( $nprocesses == 1 ) {
                    print "$line\n";
                } else {
                    print "$process:$line\n";
                }
            }
        }
    }
}

sub go_file {
    my $file = shift;
    my $mode = shift;

    #if ( $stats_total or $group ) {
    #    my @data;
    #    open( PCMD, "$file" ) or die "$prog: cant open file $file: $!\n";
    #    local $/ = "\n\n";
    #    while (<PCMD>) {
    #        s/\n//g;
    #        push @data, $_;
    #    }
    #    my $s = read_stats(@data);
    #    show_stats($s);
    #    return;
    #}

    open( PCMD, "$file" ) or die "$prog: cant open file $file: $!\n";
    my @data = <PCMD>;
    close(PCMD);

    my %lines;    # A hash of arrays.

    foreach my $line (@data) {
        process_line( $line, \%lines );
    }
    post_process_lines( \%lines );
    show_results( \%lines, $mode, undef );
}

sub rc_status {
    my $status = shift;
    my %rc;

    $rc{'rc'}     = $status >> 8;
    $rc{'core'}   = ( $status & 128 ) >> 7;
    $rc{'signal'} = $status & 127;

    return %rc;
}

sub maybe_clear_screen {
    return unless $watch;
    if ( $conf{watch_clears_screen} ) {
        printf( "%s", " \033[1;1H" );
        printf( "%s", "\033[2J" );
    }
}

sub connect_to_child {
    my ( $host, $port, $word ) = @_;

    my $socket = IO::Socket::INET->new(
        PeerAddr => $host,
        PeerPort => $port,
        Proto    => 'tcp',
    ) or die("Failed to connect to child ($host:$port)");

    print $socket "hello $word\n";

    return $socket;
}

sub my_encode {
    return encode_base64( nfreeze(shift), "" );
}

sub my_decode {
    return thaw( decode_base64(shift) );
}

# We have read data on a socket, process it and call
# any callback.
sub extract_line {
    my ( $handle, $sd ) = @_;

    my $str = $sd->{str};

    # Do this to allow telnet sessions to work.
    $str =~ s/\r//g;

    # Allow multi-line output here, making sure we process each line.
    while ( $str =~ /^([^\n]+)\n/ ) {
        $sd->{line_cb}( $handle, $sd, $1 );
        my $len  = length($1);
        my $flen = length($str);
        if ( ( $len + 1 ) != $flen ) {
            $str = substr( $str, $len + 1, $flen - $len );
        } else {
            $str = "";

        }
        $sd->{str} = $str;
    }

    return;

}

# A simple "ladder" or 1-wide tree
sub generate_comm_tree_ladder {
    my ($a)  = @_;
    my @b    = @{$a};
    my $last = "root";
    my %comm_tree;
    foreach my $c (@b) {
        $comm_tree{$c}{parent} = $last;
        push( @{ $comm_tree{$last}{children} }, $c );
        $last = $c;
    }

    return \%comm_tree;
}

# Fairly simple this, walk through the hosts keeping a list
# of joints (Those able to accept children this iteration) and
# leaves (those able to accept children next iteration) and
# loop until there are no more hosts left to add.
sub generate_binary_tree {
    my ( $a, $width ) = @_;
    my @b    = @{$a};
    my $last = "root";
    my %comm_tree;

    my @leaves;

    my $root = shift( @{$a} );

    my @joints;
    push( @joints, $root );

    $comm_tree{root}{children}[0] = $root;

    while ( @{$a} ) {
        foreach my $joint (@joints) {
            my @children = splice( @{$a}, 0, $width );
            if ( $#children > -1 ) {
                push( @leaves, @children );
                @{ $comm_tree{$joint}{children} } = @children;
            }
        }
        @joints = @leaves;
        @leaves = ();
    }

    return \%comm_tree;
}

# For each remote process generate a tree, giving each
# process a parent and a number of children.
# Currently just make this a simple "ladder" but should
# probably be a f-nomial tree.
sub generate_comm_tree {
    my ($a) = @_;

    return generate_binary_tree( $a, $conf{tree_width} );
}

# Called once when we have the socket details of the last child.
sub connect_to_children {
    my $comm_data = shift;

    debug_log( "signon", undef, "Received last signon, connecting to inner" );

    @{ $comm_data->{host_ids} } = sortn( keys( %{ $comm_data->{remote} } ) );
    $comm_data->{connection_tree} =
      generate_comm_tree( $comm_data->{host_ids} );

    my $td = $comm_data->{connection_tree}->{root}{children}[0];

    debug_log( "ctree", $comm_data->{connection_tree}, "connection tree" );

    my $cdata;
    $cdata->{socket} = connect_to_child(
        $td,
        $comm_data->{remote}{$td}{port},
        $comm_data->{remote}{$td}{key}
    );
    $cdata->{active}   = 1;
    $cdata->{str}      = "";
    $cdata->{fd_desc}  = "child socket";
    $cdata->{line_cb}  = \&command_from_inner;
    $cdata->{eof_cb}   = \&eof_from_fd;
    $cdata->{event_cb} = \&handle_event_from_socket;

    $comm_data->{sockets}{ $cdata->{socket} } = $cdata;
    $comm_data->{sel}->add( $cdata->{socket} );
}

sub issue_command_to_inner {
    my ( $cdata, $cmd ) = @_;
    my $str = my_encode($cmd);
    debug_log( "full_duplex", $cmd, "Sending command to inner, %d bytes",
        length($str) );
    $cdata->{socket}->print("$str\n");
}

sub first_command {
    my $comm_data = shift;

    my $req;
    $req->{mode}            = "signon";
    $req->{connection_tree} = $comm_data->{connection_tree};
    $req->{remote}          = $comm_data->{remote};

    # Also send over some of the per-run (as opposed to per-mode)
    # configuration  options.
    # XXX: Need to send over scripts and other stuff here as well.

    if ( $conf{rmgr} eq "orte" ) {
        $req->{orte_data} = $open_jobs{ $comm_data->{jobid} }{ranks};
    }

    $req->{cinner} = \%cinner;
    $req->{cinner}{jobid} = $comm_data->{jobid};

    return $req;
}

my @commands;

# Push a command onto the list of commands to be executed.
sub push_command {
    my ( $mode, $out_format, $args ) = @_;

    my %cmd;
    $cmd{mode}       = $mode;
    $cmd{out_format} = $out_format if defined($out_format);
    $cmd{args}       = $args if defined($args);
    push @commands, \%cmd;
}

sub next_command {
    my $comm_data = shift;

    if ( $#commands == -1 ) {
        my $req;
        $req->{mode} = "exit";
        return $req;
    }

    my $cmd;

    if ($watch) {
        $cmd = $commands[0];
    } else {
        $cmd = shift(@commands);
    }

    my $req;
    $req->{mode} = $cmd->{mode};

    if ( defined $cmd->{args} ) {
        $req->{cargs} = $cmd->{args};
    }

    # XXX: Should only send this list over if it makes sense, for example
    # the deadlock code only works when targetting all ranks.
    if ( defined $rank_rng ) {
        $req->{ranks} = $rank_rng;
    }

    if ( defined $cmd->{out_format} ) {
        $req->{out_format} = $cmd->{out_format};
    }

    # Send along the secondary args, taking care not to override any that
    # are already defined.
    if ( defined $allfns{ $req->{mode} }{secondary} ) {
        foreach my $sec ( @{ $allfns{ $req->{mode} }{secondary} } ) {
            if ( not defined( $req->{cargs}{ $sec->{arg_long} } ) ) {
                $req->{cargs}{ $sec->{arg_long} } = $sec->{value};
            }
        }
    }

    if ( $conf{verbose} and defined $req->{cargs} ) {
        printf("Mode '$req->{mode}' mode specific flags:\n");
        foreach my $arg ( sort( keys( %{ $req->{cargs} } ) ) ) {
            if ( defined $req->{cargs}{$arg} ) {
                printf( "%20s : '%s'\n", $arg, $req->{cargs}{$arg} );
            } else {
                printf( "%20s : undef\n", $arg );
            }
        }
    }

    return $req;
}

sub report_failed_signon {
    my ( $key, $data ) = @_;
    my %c;
    $c{i} = length($key);
    printf("$key : ranks\n");
    foreach my $value ( sort( keys( %{$data} ) ) ) {
        printf( "%$c{i}s : %s\n",
            $value, rng_convert_to_user( $data->{$value} ) );
    }
}

sub check_signon {
    my ( $comm_data, $data ) = @_;
    return if ( $conf{check_signon} eq "none" );
    my %here;
    while (
        defined( my $proc = rng_shift( $data->{target_data}{found}{yes} ) ) )
    {
        $here{$proc} = 1;
    }
    my $rng = rng_create_empty();

    for ( my $proc = 0 ; $proc < $comm_data->{nprocesses} ; $proc++ ) {
        if ( not defined $here{$proc} ) {
            rng_add_value( $rng, $proc );
        }
    }

    if ( not rng_empty($rng) ) {
        printf( "Warning, failed to locate ranks %s\n",
            rng_convert_to_user($rng) );
    }

    return if ( $conf{check_signon} eq "missing" );

    if ( keys( %{ $data->{target_data}{name} } ) != 1 ) {
        printf("Warning, remote process name differs across ranks\n");
        report_failed_signon( "name", \%{ $data->{target_data}{name} } );
    }

    if ( keys( %{ $data->{target_data}{state} } ) != 1 ) {
        printf("Warning, remote process state differs across ranks\n");
        report_failed_signon( "state", \%{ $data->{target_data}{state} } );
    }

}

my $header_shown = 0;

sub maybe_show_header {
    my ($comm_data) = @_;
    return if ($header_shown);
    my $mode = $comm_data->{current_req}{mode};

    if ( defined $allfns{$mode}{pre_out_handler} ) {
        $allfns{$mode}{pre_out_handler}( $comm_data->{nprocesses} );
    }
    $header_shown = 1;
}

sub format_target_data {
    my ($td) = @_;

    my $ret = "\n";
    foreach my $name ( sort( keys( %{$td} ) ) ) {
        $ret .= "Namespace: \"$name\"\n";
        foreach my $value ( sortn( keys( %{ $td->{$name} } ) ) ) {
            $ret .= "    $value\t";
            $ret .= rng_convert_to_user( $td->{$name}{$value} ) . "\n";
        }
    }
    return $ret;
}

sub command_from_inner {
    my ( $comm_data, $cdata, $line ) = @_;

    # Initial signon from child.
    if ( $line eq "Welcome" ) {
        my $req = first_command($comm_data);
        $comm_data->{current_req} = $req;
        issue_command_to_inner( $cdata, $req );
        return;
    }

    # A reply from inner.
    my $d = my_decode($line);

    debug_log( "full_duplex", $d, "Reply from inner, %d bytes", length($line) );

    # The inner process has signed on.
    if ( $comm_data->{current_req}->{mode} eq "signon" ) {
        $comm_data->{current_req} = next_command($comm_data);
        issue_command_to_inner( $cdata, $comm_data->{current_req} );
        $comm_data->{state} = "live";
        check_signon( $comm_data, $d );
        return;
    }

    # The inner process is about to exit.
    if ( $comm_data->{current_req}->{mode} eq "exit" ) {
        $comm_data->{state} = "shutdown";
        return;
    }

    # We have received a reply to a request, send the next
    # request first and then display this reply.  If in
    # watch mode display the reply, sleep and then send
    # the next request.
    my $req = next_command($comm_data);
    if ( not $watch ) {
        issue_command_to_inner( $cdata, $req );
    }

    if ( defined $d->{target_data} ) {
        debug_log(
            "tdata", $d->{target_data},
            "Target data %s",
            format_target_data( $d->{target_data} )
        );
    }

    maybe_clear_screen();
    maybe_show_header($comm_data);

    # Mode here is the mode for the reply we just got, this
    # may not be the same thing as the request we are currently
    # sending.
    my $mode = $comm_data->{current_req}->{mode};

    if ( defined $d->{target_data}{error} ) {
        printf("Warning: errors reported by some ranks\n========\n");
        foreach my $error ( sort( keys( %{ $d->{target_data}{error} } ) ) ) {
            printf( "%s: $error\n",
                rng_convert_to_user( $d->{target_data}{error}{$error} ) );
        }
        printf("========\n");
    }

    if ( defined( $allfns{$mode}{out_handler} ) ) {
        $allfns{$mode}{out_handler}( $conf{mode_options}{$mode}, $d );
    } else {
        default_output_handler( $comm_data->{current_req}, $d );
    }

    $comm_data->{current_req} = $req;

    if ($watch) {
        sleep( $conf{interval} );
        issue_command_to_inner( $cdata, $req );
    }

    return;
}

sub handle_signon {
    my ( $comm_data, $host, $port, $key ) = @_;

    $comm_data->{remote}{$host}{port} = $port;
    $comm_data->{remote}{$host}{key}  = $key;
    $comm_data->{signons}++;

    if ( $comm_data->{signons} == $comm_data->{nhosts} ) {
        connect_to_children($comm_data);
    }
}

sub hello_from_inner {
    my ( $comm_data, $cdata, $line ) = @_;

    # Children connect back with "Hello $outerkey $hostname $port $innernkey";
    my @words = split( " ", $line );
    if ( $#words != 4 or $words[0] ne "Hello" or $words[1] ne $secret ) {
        printf("Bad signon $line\n");
        return 0;
    }

    handle_signon( $comm_data, $words[2], $words[3], $words[4] );

    if ( $comm_data->{signons} == $comm_data->{nhosts} ) {

        # Don't listen on this port any more;
        $comm_data->{sel}->remove( $comm_data->{listen} );
        $comm_data->{listen}->close();

    }
}

sub inner_stdout_cb {
    my ( $comm_data, $cdata, $line ) = @_;
    my @words = split( " ", $line );
    if ( $#words == 3 and $words[0] eq "connect" ) {

        handle_signon( $comm_data, $words[1], $words[2], $words[3] );
        return;
    } elsif ( $words[0] eq "debug" ) {
        my $count = $comm_data->{sel}->count();
        print("There are $count sockets\n");
        return;
    }
    print("inner: $line\n");
}

sub inner_stderr_cb {
    my ( $comm_data, $cdata, $line ) = @_;
    print("einner: $line\n");
}

sub eof_from_fd {
    my ( $comm_data, $cdata ) = @_;

    if ( $comm_data->{state} ne "shutdown" ) {
        printf("Unexpected EOF from $cdata->{fd_desc} ($comm_data->{state})\n");
    }

    #printf("Expected EOF from $cdata->{fd_desc} ($comm_data->{state})\n");
}

sub handle_event_from_socket {
    my ( $comm_data, $h ) = @_;
    my $cdata = $comm_data->{sockets}{$h};

    my $data;
    my $nb = sysread( $h, $data, 65536 );

    if ( $nb == 0 ) {
        if ( defined( $cdata->{eof_cb} ) ) {
            $cdata->{eof_cb}( $comm_data, $cdata );
        }
        $comm_data->{sel}->remove($h);
        $h->close();
    } else {
        $cdata->{str} .= $data;
        extract_line( $comm_data, $cdata );
    }
}

sub handle_event_from_port {
    my ( $comm_data, $h ) = @_;

    my $new = $h->accept();
    $comm_data->{sel}->add($new);
    my %cdata;
    $cdata{str}                 = "";
    $cdata{line_cb}             = \&hello_from_inner;
    $cdata{event_cb}            = \&handle_event_from_socket;
    $comm_data->{sockets}{$new} = \%cdata;
}

###############################################################################
#
# Range mapping functions.
#
###############################################################################

# A common set of functions for dealing with (integer based) ranges.
#
# Internally a array format is used for speed, functions exist to convert from
# the normal list format "[0-12,15,16]" to the internal one and back again.

# rng_convert_from_user($userrange)
# rc_convert_to_user($range)
#   Convert to and from the normal type to the internal type.

# rng_shift($range)
#   Pop the lowest value off the range.

# rng_add_value($range,$value)
#   Add a value to the range.

# rng_merge($range,$new)
#   Merge two ranges.

# rng_dup($range)
#   Duplicate a range

# rng_create_from_array(@array)
#   Create a range from an array

# rng_create_empty
#   Create a empty range

# rng_empty
#   Test for emptyness.

# Potentially needed but not implemented yet

# rng_user_verify()
# is_value_in_range()
# nvalues_in_range()
# rng_find_missing()
#   Take two ranges and return all that are in the first but not in the
#   second. (see check_signon).

# Convert from a user range to a internal one.
sub rng_convert_from_user {
    my ($range) = @_;

    return undef unless defined $range;
    return undef if $range eq "";
    return undef if $range eq "[]";

    my $newrange;

    if ( $range =~ m/^\[([\d\-\,]+)\]$/ ) {
        $newrange = $1;
    } elsif ( $range =~ m/^(\d+)$/ ) {
        $newrange = $1;
    } else {
        confess("Failed to recognise $range as range\n");
    }

    my @user_parts = split( ",", $newrange );

    my @parts;

    foreach my $part (@user_parts) {
        my %part;
        if ( $part =~ m/^(\d+)$/ ) {
            $part{l} = $1;
            $part{u} = $1;
        } elsif ( $part =~ m/^(\d+)\-(\d+)$/ ) {
            $part{l} = $1;
            $part{u} = $2;
        } else {
            confess("Failed to recognise $part as range\n");
        }
        push( @parts, \%part );
    }
    return \@parts;
}

sub rng_convert_to_user {
    my ($rg) = @_;

    my @entries;
    foreach my $part ( @{$rg} ) {
        if ( $part->{l} == $part->{u} ) {
            push( @entries, $part->{l} );
        } else {
            push( @entries, "$part->{l}-$part->{u}" );
        }
    }
    my $range = join( ",", @entries );
    return "[$range]";
}

sub rng_shift {
    my ($rg) = @_;

    # Return undef if this range is empty.
    return undef if ( $#{$rg} == -1 );

    my $value = $rg->[0]->{l};
    if ( $rg->[0]->{l} == $rg->[0]->{u} ) {
        shift( @{$rg} );
    } else {
        $rg->[0]->{l}++;
    }
    return $value;
}

# Accept duplicate values in this function.
sub rng_add_value {
    my ( $rg, $value ) = @_;

    if ( ref( $rg->[0] ) eq "" ) {
        push( @{$rg}, { 'l' => $value, 'u' => $value } );
        return;
    }

    # If it's after the last value then just add it.
    if ( $value > $rg->[-1]->{u} + 1 ) {
        push( @{$rg}, { 'l' => $value, 'u' => $value } );
        return;
    }

    my $idx = 0;
    foreach my $part ( @{$rg} ) {

        if ( $value == $part->{l} - 1 ) {

            # Extend the current entry downwards.
            $part->{l}--;
            return;
        } elsif ( $value < $part->{l} ) {

            # If it's before the current entry then insert it.
            splice( @{$rg}, $idx, 0, { 'l' => $value, 'u' => $value } );
            return;
        } elsif ( $value == $part->{u} + 1 ) {

            # Extend the current entry upwards.
            $part->{u}++;

            # If we meet the subsequent entry then merge the two.
            if ( exists $rg->[ $idx + 1 ]
                and $part->{u} + 1 == $rg->[ $idx + 1 ]->{l} )
            {
                $part->{u} = $rg->[ $idx + 1 ]->{u};
                splice( @{$rg}, $idx + 1, 1 );
            }
            return;
        } elsif ( $value >= $part->{l} and $value <= $part->{u} ) {
            return;
        }
        $idx++;
    }
    confess("Failed to add value to range");
}

sub rng_merge {
    my ( $rg, $new ) = @_;

    # Need to use defined here as zero is a valid value to store
    # in a range.
    while ( defined( my $val = rng_shift($new) ) ) {
        rng_add_value( $rg, $val );
    }
    return;
}

sub rng_dup {
    my ($rg) = @_;
    return dclone($rg);
}

sub rng_create_from_array {
    my (@r) = @_;

    my $rng = rng_convert_from_user( shift(@r) );
    while ( defined( my $v = shift(@r) ) ) {
        rng_add_value( $rng, $v );
    }
    return $rng;
}

sub rng_create_empty {
    my @r;
    return \@r;
}

sub rng_empty {
    my ($rg) = @_;

    return ( ref( $rg->[0] ) eq "" );
}

sub create_local_port {
    my ($range) = @_;

    my %options = (
        Reuse  => 1,
        Proto  => 'tcp',
        Listen => 2,
    );

    if ( not defined $range ) {
        my $sl = IO::Socket::INET->new(%options)
          or confess("Failed to create local port: $!");
        return $sl;
    }

    my $rg = rng_convert_from_user($range);

    while ( my $port = rng_shift($rg) ) {
        $options{LocalPort} = $port;
        my $sl = IO::Socket::INET->new(%options);
        return $sl if defined $sl;
    }

    die("Failed to create local port, no free ports in range \"$range\"\n");
}

sub go_parallel {
    my $jobid      = shift;
    my $cmd        = shift;
    my $nprocesses = shift;
    my $nhosts     = shift;

    my $comm_data;

    my $sel = IO::Select->new();
    if ( $conf{inner_callback} ) {
        my $sl = create_local_port( $conf{port_range} );

        $comm_data->{listen} = $sl;
        my $port     = $sl->sockport();
        my $hostname = hostname();
        config_set_internal( "outer", "$hostname:$port" );
        $sel->add($sl);

        my %cdata;
        $cdata{event_cb} = \&handle_event_from_port;
        $comm_data->{sockets}{$sl} = \%cdata;
    }

    map { $cmd .= " --$_=\"$cinner_cmd{$_}\"" } keys(%cinner_cmd);

    debug_log( "show_cmd", undef, $cmd );

    my $pcmd = {
        pid => -1,
        in  => "",
        out => *OUT,
        err => *ERR,
    };

    $pcmd->{pid} = open3( $pcmd->{in}, *OUT, *ERR, $cmd )
      or confess "Unable to open3() pcmd: $!\n";

    close $pcmd->{in};

    $comm_data->{nhosts}     = $nhosts;
    $comm_data->{nprocesses} = $nprocesses;
    $comm_data->{cmd}        = $cmd;
    $comm_data->{jobid}      = $jobid;
    $comm_data->{signons}    = 0;

    # State, one of "connecting" "live" and "shutdown";
    $comm_data->{state} = "connecting";

    $sel->add( $pcmd->{out} );
    $sel->add( $pcmd->{err} );

    $comm_data->{sel} = $sel;
    my $start = time();

    my %op;
    $op{str}                              = "";
    $op{line_cb}                          = \&inner_stdout_cb;
    $op{eof_cb}                           = \&eof_from_fd;
    $op{fd_desc}                          = "Inner stdout";
    $op{event_cb}                         = \&handle_event_from_socket;
    $comm_data->{sockets}{ $pcmd->{out} } = \%op;

    my %ep;
    $ep{str}                              = "";
    $ep{line_cb}                          = \&inner_stderr_cb;
    $ep{eof_cb}                           = \&eof_from_fd;
    $ep{fd_desc}                          = "Inner stderr";
    $ep{event_cb}                         = \&handle_event_from_socket;
    $comm_data->{sockets}{ $pcmd->{err} } = \%ep;

    while ( $sel->count() > 1 ) {
        while ( my @live = $sel->can_read(5) ) {
            foreach my $h (@live) {
                if ( defined $comm_data->{sockets}{$h} ) {
                    my $cdata = $comm_data->{sockets}{$h};
                    $cdata->{event_cb}( $comm_data, $h );
                } else {
                    printf("Responce from unknown fd $h\n");
                    exit(1);
                }
            }
        }
        my $t2    = time() - $start;
        my $count = $sel->count();
        if ( $count > 0 ) {

            #printf("Still here, time:$t2 comm_count:$count\n");
            if ( $comm_data->{signons} != $comm_data->{nhosts} ) {
                my $missing = $comm_data->{nhosts} - $comm_data->{signons};
                print("Waiting for signon from $missing hosts.\n");
            }
        }
    }

    waitpid( $pcmd->{pid}, 0 );
    my $res = $?;

    if ( $comm_data->{state} ne "shutdown" ) {
        printf(
"Unexpected exit from parallel command (state=$comm_data->{state})\n"
        );
    }
    printf("result from parallel command is $res (state=$comm_data->{state})\n")
      if ( $conf{verbose} );

    if ( $res != 0 ) {
        my %status = rc_status($res);
        if ( job_is_running($jobid) ) {
            printf(
                "Bad exit code from parallel command (exit_code=$status{rc})\n"
            );
        } else {
            printf("Job $jobid is no longer active\n");
            return 1;
        }
    }

    return 0;
}

sub create_padb_secret {
    my $filename = "$ENV{HOME}/.padb-secret";
    my $FD;
    if ( not open $FD, '>', $filename ) {
        printf("Failed to create secret file: $!\n");
        return;
    }
    if ( chmod( 0600, $filename ) != 1 ) {
        printf("Failed to chmod secret file: $!\n");
        return;
    }
    my $s = rand;
    print {$FD} "secret=$s\n";
    close $FD;
    print("Sucessfully created secret file ($filename)\n");
    return;
}

sub find_padb_secret {

    my $file = "$ENV{HOME}/.padb-secret";
    if ( !-f $file ) {
        printf("No secret file ($file)\n");
        return;
    }
    my (
        $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
        $size, $atime, $mtime, $ctime, $blksize, $blocks
    ) = stat($file);

    # Check that the file is mode 100600 (Octal)
    if ( $mode != 33152 ) {
        printf("Wrong permissions on secret file, should be 0600 ($file)\n");
        exit(1);
    }

    open( SFD, $file ) or return;
    my @l = <SFD>;
    close(SFD);
    if ( $#l != 0 ) {
        return;
    }
    if ( $l[0] =~ /^secret=([\d\w\.]+)$/ ) {
        return $1;
    }
    print "Failed to load secret from file ($file)\n";
    exit 1;
}

sub go_job {
    my $jobid = shift;

    if ( defined $rmgr{ $conf{rmgr} }{require_inner_callback}
        and $rmgr{ $conf{rmgr} }{require_inner_callback} )
    {
        $conf{inner_callback} = 1;
    }

    if ( $conf{inner_callback} ) {
        $secret = find_padb_secret();

        if ( not defined $secret ) {
            printf("Error: Could not load secret file on this node\n");
            print "Use --create-secret-file to create one\n";
            exit(1);
        }

    }

    $conf{verbose} && print "Attaching to job $jobid\n";

    $rem_jobid = $jobid;

    # Setup whatever is needed for running parallel commands, note this might
    # involve setting environment variables.
    my ( $cmd, $ncpus, $hosts ) = setup_pcmd($jobid);

    $conf{verbose} && defined $ncpus && print "Job has $ncpus process(es)\n";
    $conf{verbose} && defined $hosts && print "Job spans $hosts host(s)\n";

    debug_log( "verbose", undef, "There are %d processes over %d hosts",
        $ncpus, $hosts );

    $cmd .= " $0 --inner";

    if ( not defined $hosts ) {
        printf("Full duplex mode needs to know the host count\n");
        printf("Which is doesn't for this resource manager: $conf{rmgr}\n");
        return 1;
    }
    my $errors = go_parallel( $jobid, $cmd, $ncpus, $hosts );

    debug_log( "verbose", undef, "Completed command" );

    cleanup_pcmd();
    return $errors;
}

###############################################################################
#
# Outer main
#
###############################################################################

sub cmdline_error {
    my $str = shift;
    print STDERR $str;
    exit(1);
}

sub config_init {
    map { $ic_names{$_}++ } @inner_conf;
    map { $ic_names_cmd{$_}++ } @inner_conf_cmd;
}

sub config_set_internal {
    my ( $key, $value ) = @_;
    if ( exists $conf{$key} ) {
        $conf{$key} = $value;
    } else {
        foreach my $mode ( keys( %{ $conf{mode_options_reverse}{$key} } ) ) {
            $conf{mode_options}{$mode}{$key} = $value;
        }
    }

    # Mark this variable to be passed onto the inner processes.
    if ( defined $ic_names{$key} ) {
        $cinner{$key} = $value;
    }

    if ( defined $ic_names_cmd{$key} ) {
        $cinner_cmd{$key} = $value;
    }
}

sub config_set {
    my ( $key, $value ) = @_;
    printf("Setting '$key' to '$value'\n") if ( $conf{verbose} );

    if ( !exists $conf{$key} and !exists $conf{mode_options_reverse}{$key} ) {
        printf( STDERR
              "Warning, unknown config option '$key' value '$value'.\n" );
    }

    config_set_internal( $key, $value );
}

sub config_from_file {
    my $file = shift;

    printf("Loading config from \"$file\"\n") if ( $conf{verbose} );
    open( CFILE, $file ) or return;

    while (<CFILE>) {
        if (/^([\w-]+)\s*\=\s*(.*)/) {
            my $key   = $1;
            my $value = $2;
            $key =~ s/\-/\_/g;
            config_set( $key, $value );
        }
    }
    close(CFILE);
    return;
}

sub config_from_env {
    printf("Loading config from environment\n") if ( $conf{verbose} );

    foreach my $key ( keys(%conf) ) {
        $key =~ s/\-/\_/g;
        my $name = "PADB_" . uc($key);
        if ( defined $ENV{$name} ) {
            config_set( $key, $ENV{$name} );
        }
    }

    foreach my $key ( keys( %{ $conf{mode_options_reverse} } ) ) {
        $key =~ s/\-/\_/g;
        my $name = "PADB_" . uc($key);
        if ( defined $ENV{$name} ) {
            config_set( $key, $ENV{$name} );
        }
    }
}

sub config_help {
    printf("Current options are:\n");

    my $max_len = 0;

    foreach my $key ( keys(%conf) ) {
        next if ( ref( $conf{$key} ) eq "HASH" );
        if ( length $key > $max_len ) {
            $max_len = length $key;
        }
    }

    foreach my $key ( sort( keys(%conf) ) ) {
        next if ( ref( $conf{$key} ) eq "HASH" );
        my $name = $key;
        $name =~ s/\_/\-/g;
        if ( defined $conf{$key} ) {
            printf( " %$max_len" . "s = '$conf{$key}'\n", $name );
        } else {
            printf( " %$max_len" . "s = unset\n", $name );
        }
    }

    foreach my $mode ( sort( keys( %{ $conf{mode_options} } ) ) ) {
        printf("\nOptions for mode '$allfns{$mode}{arg_long}'\n");
        foreach my $key ( sort( keys( %{ $conf{mode_options}{$mode} } ) ) ) {
            my $name = $key;
            $name =~ s/\_/\-/g;
            if ( defined $conf{mode_options}{$mode}{$key} ) {
                printf(
                    " %$max_len" . "s = '$conf{mode_options}{$mode}{$key}'\n",
                    $name
                );
            } else {
                printf( " %$max_len" . "s = undef\n", $name );

            }
        }
    }
}

sub outer_main {

    my $mode = parse_args_outer();

    if ( getpwnam($user) eq "" ) {
        print STDERR "$prog: Error: no such user as '$user'\n";
        exit 1;
    }

  # Load from the config files first, then the env and finally the command line.

    config_init();

    config_from_file($configfile);

    config_from_file("$ENV{HOME}/.padbrc") unless ( $norc == 1 );

    config_from_env();

    printf("Loading config from command line\n") if ( $conf{verbose} );

    #
    # Once again there is a 'bugette' here, you cant pass the
    # first of these strings through due to the split hacking
    # off everything to the right of the second equals sign
    # however you can do the second.
    # -Oedbopt="--pagesize=8192 --pagesize-header=4096"
    # -Oedbopt="--pagesize 8192 --pagesize-header 4096"
    foreach my $config_option (@config_options) {

        my ( $key, $val ) = split( "=", $config_option );

        my $name = $key;

        $key =~ s/\-/\_/g;

        if ( $key eq "scriptDir" ) {
            printf(
"$prog: -OscriptDir deprecated, use -Oedb=/path/to/edb instead\n"
            );
            exit(1);
        }

        if (    !exists $conf{$key}
            and !exists $conf{mode_options_reverse}{$key} )
        {
            printf("Error, unknown config option '$name'\n");
            config_help();
            exit(1);
        }
        if ( !defined $val ) {
            printf("Error, config option '$name' requires value\n");
            config_help();
            exit(1);
        }
        config_set( $key, $val );
    }

    if ($create_secret) {
        create_padb_secret();
        exit 0;
    }

    if ($list_rmgrs) {
        foreach my $res ( sort( keys %rmgr ) ) {
            my $working = "yes";

            if ( defined $rmgr{$res}{is_installed}
                and not $rmgr{$res}{is_installed}() )
            {
                $working = "no";
            }
            my $r = $res;

            if ( $working eq "yes" ) {
                printf("$r: ");
                my @jobs = $rmgr{$res}{get_active_jobs}($user);
                if ( $#jobs > -1 ) {
                    my $j = join( " ", sortn(@jobs) );
                    printf("jobs($j)\n");
                } else {
                    printf("No active jobs\n");
                }
            } else {
                printf("$r: not active\n");
            }
        }
        exit(0);
    }

    if ($core_stack) {
        if ( not defined $core_name or not defined $exe_name ) {
            printf(
                "Usage $0 --core-stack --core=<corefile> --exe=<executable>\n");
            exit(1);
        }
        if ( not -f $exe_name ) {
            printf("Error: executable file '$exe_name' does not exist!\n");
            exit(1);
        }
        if ( not -f $core_name ) {
            printf("Error: core file '$core_name' does not exist!\n");
            exit(1);
        }
        stack_from_core( $exe_name, $core_name );
        exit(0);
    }

    if ($full_report) {

        find_rmgr();

        if ( not job_is_running($full_report) ) {
            printf( STDERR
"Job $full_report is not active, use --show-jobs to see active jobs\n"
            );
            exit(1);
        }

        printf("padb version $version\n");
        printf("full job report for job $full_report\n\n");

        push_command( "mqueue", "compress" );

        push_command("deadlock");

        my $c = $conf{mode_options}{stack};
        $c->{strip_above_wait} = 0;
        push_command( "stack", "tree", $c );

        go_job($full_report);
        exit(0);
    }

    if ($show_jobs) {
        find_rmgr();
        my @jobids = get_all_jobids($user);
        print("@jobids\n");
        exit(0);
    }

    if ($local_stats) {

        if ($watch) {
            while (1) {
                maybe_clear_screen();
                local_stats();
                sleep( $conf{interval} );
            }
        } else {
            local_stats();
        }
        exit(0);
    }

    if ( $all or $any ) {
        if ( $#ARGV ne "-1" ) {
            cmdline_error(
                "$prog: Error: --all incompatible with specific ids\n");
        }
    } elsif ( !$input_file ) {
        if ( $#ARGV eq "-1" ) {
            cmdline_error(
                "$prog: Error: no jobs specified, use --all or jobids\n");
        }
    }

    if ( ( grep { $_ } ( $any, $all, $input_file ) ) > 1 ) {
        cmdline_error(
            "$prog: Error: only specify one of --all, --any or --input_file,\n"
        );
    }

    my $style_count = ( grep { $_ } ( $compress, $compress_C, $tree ) );
    if ( $style_count > 1 ) {
        cmdline_error(
"$prog: Error: only specify one of --compress, --compress-long or --tree\n"
        );
    }

    if ( not $input_file
        and ( $have_allfns_option != 1 ) )
    {
        cmdline_error(
"$prog: Error: you must specify only one of -x, -S, -s, -g, -q, -X or --kill\n"
        );
    }

    # If delivering a signal check that it's valid.
    if ( defined($mode) and ( $mode eq "kill" ) ) {
        my $signal = uc( $secondary_args{signal} );
        my %sig_names;
        map { $sig_names{$_} = 1 } split( " ", $Config{sig_name} );

        if ( not defined $sig_names{$signal} ) {
            cmdline_error("$prog: Error: signal \"$signal\" is invalid\n");
        }
    }

    if ( $tree and !( ( defined $mode && $mode eq "stack" ) or $input_file ) ) {
        cmdline_error("$prog: Error: --tree only works with --stack-trace\n");
    }

    if ( defined($input_file) ) {
        my $m = "input";
        if ( defined $mode ) {
            $m = $mode;
        }
        go_file( $input_file, $m );
        exit(0);
    }

    my @jobids;

    if ( $any or $all ) {

        find_any_rmgr();

        @jobids = get_all_jobids($user);
        printf( "Active jobs (%d) are @jobids\n", $#jobids + 1 )
          if $conf{verbose};
        if ( $#jobids == -1 ) {
            printf("No active jobs could be found for user '$user'\n");
            exit 1;
        }
        if ( $any && $#jobids != 0 ) {
            printf("More than 1 active job (@jobids) for user '$user'\n");
            exit 1;
        }
    } else {
        find_rmgr();

        foreach my $jobid (@ARGV) {
            if ( job_is_running($jobid) ) {
                push @jobids, $jobid;
            } else {
                printf( STDERR "Job $jobid is not active\n" );
            }
        }
    }

    if ( $#jobids > 0 and $watch ) {
        printf("Cannot use --watch with more than one job\n");
        exit(1);
    }

    foreach my $jobid (@jobids) {

        printf "\nCollecting information for job '$jobid'\n\n"
          if ( $conf{verbose} or ( $#jobids > 0 ) );

        my $of;
        $of = "tree"       if $tree;
        $of = "compress"   if $compress;
        $of = "compress_c" if $compress_C;
        push_command( $mode, $of, $conf{mode_options}{$mode} );
        go_job($jobid);
    }
}

###############################################################################
#
# Inner.
#
###############################################################################

# The code below here used to be in a separate script (padb-helper.pl) but
# it's become apparent that for ease-of-distribution padb works better if it
# is self-contained in one file.  Now we just have a big switch on ARGV[0]
# and either run the inner or outer code depending on if it's set or not.

my %confInner;

sub debug {
    my ( $vp, $str ) = @_;
    $confInner{verbose} or return;
    $vp = -1 unless defined $vp;
    print "$confInner{hostname}.$vp:$str\n";

}

my %inner_output;
my %local_target_data;

sub output {
    my ( $vp, $str ) = @_;

    if ( not defined $str ) {
        carp("no output");
    }

    push( @{ $inner_output{$vp} }, $str );

}

# Report a single string error for a given target rank.
sub target_error {
    my ( $rank, $error ) = @_;
    target_key_pair( $rank, "error", $error );
    return;
}

# Report a single string error for a given target rank.
sub target_key_pair ($$$) {
    my ( $rank, $key, $value ) = @_;

    if ( defined $local_target_data{$key}{$value} ) {
        rng_add_value( $local_target_data{$key}{$value}, $rank );
    } else {
        $local_target_data{$key}{$value} = rng_convert_from_user($rank);
    }
}

sub p_die {
    my ( $vp, $str ) = @_;
    $confInner{verbose}++;
    debug( $vp, "$str, '$@'" );
    exit(1);
}

sub is_parent_resmgr {
    my $pid = shift;
    my $parent_pid = find_from_status( $pid, "PPid" );
    return is_resmgr_process($parent_pid);
}

$SIG{PIPE} = 'IGNORE';

sub gdb_start {
    my ( $exe, $core ) = @_;
    my $gdb = {
        gdbpid   => -1,
        tracepid => -1,
        attached => 0,
        rdr      => "",
        wtr      => "",
        err      => "",
    };

    my $cmd = "gdb --interpreter=mi -q";
    if ( defined $core ) {
        $cmd .= " $exe $core";
    }

    $gdb->{gdbpid} = open3( $gdb->{wtr}, $gdb->{rdr}, $gdb->{err}, $cmd )
      or die "Unable to popen() gdb: $!\n";

    return $gdb;
}

sub gdb_quit {
    my ($gdb) = @_;
    my $result = gdb_send( $gdb, "quit" );
    waitpid( $gdb->{gdbpid}, 0 );
    close( $gdb->{rdr} );
    close( $gdb->{wtr} );
    close( $gdb->{err} );
    return;
}

sub gdb_attach {
    my ( $gdb, $pid ) = @_;

    send_cont_signal($pid);
    my %p = gdb_n_send( $gdb, "attach $pid" );

    if ( not defined $p{status} ) {
        $gdb->{error} = "Failed to attach to process";
        if ( not find_exe("gdb") ) {
            $gdb->{error} = "Failed to attach to process (gdb not installed?)";
        }
        return;
    }

    if ( $p{status} eq "error" ) {
        my $r = gdb_parse_reason( $p{reason} );
        if ( defined $r->{msg} ) {
            $gdb->{error} = "Failed to attach to process: $r->{msg}";
        } else {
            $gdb->{error} = "Failed to attach to process";
        }
        return;
    }

    $gdb->{attached} = 1;
    $gdb->{tracepid} = $pid;

    return $pid;
}

sub gdb_detach {
    my ($gdb) = @_;
    my $result = gdb_send( $gdb, "-target-detach" );

    return if ( $result eq "error" );

    $gdb->{attached} = 0;

    send_cont_signal( $gdb->{tracepid} );
}

sub gdb_wait_for_prompt {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};
    while (<$handle>) {
        return if /^\(gdb\)/;
    }

    return;
}

sub gdb_n_send {
    my ( $gdb, $cmd ) = @_;
    gdb_wait_for_prompt($gdb);
    my $handle = $gdb->{wtr};
    print $handle "$cmd\n";
    my %r = gdb_n_next_result($gdb);
    $r{cmd} = $cmd;
    return %r;
}

sub gdb_send {
    my ( $gdb, $cmd ) = @_;
    my %p = gdb_n_send( $gdb, $cmd );
    return $p{status};
}

sub strip_square {
    my $str = shift;
    $str =~ /^\[(.*)\]$/;
    return $1;
}

sub strip_soft {
    my $str = shift;
    $str =~ /^\{(.*)\}$/;
    return $1;
}

sub strip_quotes {
    my $str = shift;
    $str =~ /^\"(.*)\"$/;    #"
    return $1;
}

sub strip_leading_comma {
    my $str = shift;
    $str =~ /^,(.*)$/;
    return $1;
}

sub strip_first_quotes {
    my $str = shift;
    $str =~ s/\\\"/REALLYBAD/g;      #"
    $str =~ /^\"([^\"]*)\"(.*)$/;    #"
    my $val      = $1;
    my $leftover = $2;

    $val      =~ s/REALLYBAD/\"/g;      #"
    $leftover =~ s/REALLYBAD/\\\"/g;    #"
    return ( $val, $leftover );

}

# Has to return key (str) value (complex) extra(string)
sub extract_value_square {
    my $str = shift;

    my $left   = "";
    my $right  = $str;
    my $indent = 0;

    while ( $right =~ /^([^\[\]]*)([\[\]])(.*)$/ ) {
        if ( $2 eq "[" ) {
            $indent++;
            $left  = "$left$1\[";
            $right = $3;
        } else {
            $indent--;
            $left  = "$left$1\]";
            $right = $3;
            if ( $indent == 0 ) {
                return ( strip_square($left), $right );
            }
        }

        # printf("$2 $indent\nleft  '$left'\nright '$right'\n\n\n\n");
    }
    printf("ident $indent\n");
}

sub extract_value_soft {
    my $str = shift;

    my $left   = "";
    my $right  = $str;
    my $indent = 0;

    while ( $right =~ /^([^\{\}]*)([\{\}])(.*)$/ ) {
        if ( $2 eq "{" ) {
            $indent++;
            $left  = "$left$1\{";
            $right = $3;
        } else {
            $indent--;
            $left  = "$left$1\}";
            $right = $3;
            if ( $indent == 0 ) {
                return ( strip_soft($left), $right );
            }
        }

        # printf("$2 $indent\nleft  '$left'\nright '$right'\n\n\n\n");
    }
    printf("ident $indent\n");
}

sub new_parse {
    my $str      = shift;
    my $collapse = shift;

    # printf("Parsing\t\t\t\t\t\t$str\n");

    my %res;
    my $key;
    my $value;

    if ( $str =~ /^([\w\-\?]+)\=(.*)$/ ) {
        $key   = $1;
        $value = $2;
    } else {
        $key   = "tuple";
        $value = $str;
    }
    my $leftover;

    # printf("Got key/value pair! $key\n");
    my $type = substr( $value, 0, 1 );
    if ( $type eq "[" ) {
        if ( $value eq "[]" ) {
            my @e;
            return ( $key, \@e, "" );
        }
        my ( $l, $r ) = extract_value_square($value);
        $leftover = $r;

        # printf("Got value\n$l\n$r\n");

        my @b;
        while ( $l ne "" ) {
            my ( $kk, $vv, $c ) = new_parse( $l, $collapse );

            # Assert that $c is empty?
            $l = "";
            if ( $c ne "" ) {
                $c = strip_leading_comma($c);
                $l = $c;
            }
            my %q;
            if ( $kk eq "tuple" or defined $collapse and $kk eq $collapse ) {
                push @b, $vv;
            } else {
                $q{$kk} = $vv;

                # push @b,$vv;
                push @b, \%q;
            }
        }
        return ( $key, \@b, $r );
    } elsif ( $type eq "{" ) {
        my ( $l, $r ) = extract_value_soft($value);
        $leftover = $r;

        # printf("Got value\n$l\n$r\n");

        my @all;
        while ( $l ne "" ) {
            my ( $kk, $vv, $c ) = new_parse( $l, $collapse );

            $l = "";
            if ( $c ne "" ) {
                $c = strip_leading_comma($c);
                $l = $c;
            }
            if ( defined $collapse and $key eq "thread-ids" ) {
                my %r;
                $r{$kk} = $vv;
                push @all, \%r;
            } else {
                $res{$kk} = $vv;
            }
        }
        if ( defined $collapse and $key eq "thread-ids" ) {
            return ( $key, \@all, $r );
        } else {
            return ( $key, \%res, $r );
        }
    } elsif ( $type eq "\"" ) {
        my ( $this, $l ) = strip_first_quotes($value);
        return ( $key, $this, $l );
    } else {
        confess("unknown type '$type' str '$str'");
    }

    return ( $key, \%res, $leftover );

}

sub gdb_parse_reason {
    my $str      = shift;
    my $collapse = shift;

    my $leftover = $str;
    my %res;
    while ( $leftover ne "" ) {
        my ( $key, $value, $l ) = new_parse( $leftover, $collapse );
        $leftover = "";
        if ( $l ne "" ) {
            $leftover = strip_leading_comma($l);
        }
        $res{$key} = $value;
    }
    return \%res;
}
#########################################################################

sub gdb_n_next_result {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};

    my %res;

    while (<$handle>) {
        return %res if /^\(gdb\)/;
        if (/\~\"(.*)\"\n/) {    #"
            $res{raw} .= $1;
        }
        if (/\&\"(.*)\"\n/) {    #"
            $res{debug} .= $1;
        }
        if (/^\^(done|error),?(.*)$/) {
            $res{status} = $1;
            if ( defined $2 and $2 ne "" ) {
                $res{reason} = $2;

                # $current_parsed = $2;
            }
            if ( defined $res{raw} ) {
                $res{raw} =~ s/\\n/\n/g;
                chomp $res{raw};
            }
            if ( defined $res{debug} ) {
                $res{debug} =~ s/\\n/\n/g;
                chomp $res{debug};
            }
            return %res;
        }
    }
    if ( defined $res{raw} ) {
        $res{raw} =~ s/\\n/\n/g;
        chomp $res{raw};
    }
    if ( defined $res{debug} ) {
        $res{debug} =~ s/\\n/\n/g;
        chomp $res{debug};
    }

    return %res;
}

sub gdb_strip_value {
    my $str = shift;
    $str =~ /value=\"(.+)\"$/;    #"
    return $1;
}

sub gdb_strip_quotes {
    my $str = shift;
    $str =~ /^\"(.*)\"$/;         #"
    return $1;
}

sub gdb_type_size {
    my ( $gdb, $type ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression sizeof($type)" );
    return undef unless ( $p{status} eq "done" );
    return gdb_strip_value( $p{reason} );
}

sub gdb_type_offset {
    my ( $gdb, $type, $field ) = @_;
    my %p =
      gdb_n_send( $gdb, "-data-evaluate-expression \"&(($type *)0)->$field\"" );
    return undef unless ( $p{status} eq "done" );
    return hex( gdb_strip_value( $p{reason} ) );
}

sub gdb_func_addr {
    my ( $gdb, $func ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression $func" );
    return undef unless ( $p{status} eq "done" );
    my $value = gdb_strip_value( $p{reason} );
    my @a     = split( " ", $value );
    my $hex   = $a[-2];
    return $hex;
}

sub gdb_var_addr {
    my ( $gdb, $var ) = @_;
    my %p = gdb_n_send( $gdb, "-data-evaluate-expression &$var" );
    return undef unless ( $p{status} eq "done" );
    $p{reason} =~ /value=\"(.+)\"$/;    #"
    return $1;
}

sub gdb_read_raw {
    my ( $gdb, $ptr, $size ) = @_;

    my @d;
    my $offset = 0;
    my $count  = 256;
    do {
        $count = $size if ( $size < $count );
        my %p =
          gdb_n_send( $gdb, "-data-read-memory -o $offset $ptr x 1 1 $count" );
        $offset += $count;

        return undef unless ( $p{status} eq "done" );
        my $val = gdb_parse_reason( $p{reason}, "thread-ids" );
        push( @d, @{ $val->{memory}[0]{data} } );

    } while ( $offset < $size );
    return @d[ 0 .. $size - 1 ];
}

sub gdb_string {
    my ( $gdb, $strp ) = @_;
    my $offset = 0;
    my $str    = "";
    my @s      = gdb_read_raw( $gdb, $strp, 128 );
    return undef if ( not defined( $s[0] ) );
    foreach my $d (@s) {
        my $v = hex($d);
        return $str if ( $v == 0 );
        $str .= sprintf( "%c", $v );
    }
    return $str;
}

sub handle_query {
    my ( $gdb, $vp, $query, $stats ) = @_;

    my @params = split( " ", $query );
    my $b      = shift @params;
    my $cmd    = shift @params;
    my $res;
    return "fail" unless defined $cmd;
    if ( $cmd eq "size" ) {
        $res = gdb_type_size( $gdb, $params[0] );
        $stats->{size}++;
    } elsif ( $cmd eq "offset" ) {
        $res = gdb_type_offset( $gdb, $params[0], $params[1] );
        $stats->{offset}++;
    } elsif ( $cmd eq "string" ) {
        my $str = gdb_string( $gdb, $params[1] );
        if ( defined $str ) {
            $stats->{string}++;
            $res = $str;
        }
    } elsif ( $cmd eq "func" ) {
        $res = gdb_func_addr( $gdb, $params[0] );
        $stats->{function}++;
    } elsif ( $cmd eq "sym" ) {
        $res = gdb_var_addr( $gdb, $params[0] );
        $stats->{symbol}++;
    } elsif ( $cmd eq "data" ) {
        my @r = gdb_read_raw( $gdb, $params[0], $params[1] );
        if ( defined( $r[0] ) ) {
            $res = "@r";
            $stats->{datareads}++;
            $stats->{databytes} += $params[1];
        }
    } elsif ( $cmd eq "rank" ) {
        $res = $vp;
        $stats->{rank}++;
    } elsif ( $cmd eq "image" ) {
        my $image = readlink("/proc/$gdb->{tracepid}/exe");
        if ( defined $image ) {
            $res = $image;
        }
    } else {
        printf("Unhandled query $query\n");
    }
    if ( defined $res ) {
        return "ok $res";
    }
    $stats->{errors}++;

    return "fail";
}

sub launch_h {
    my ( $gdb, $vp ) = @_;

    my $h = {
        hpid     => -1,
        tracepid => -1,
        attached => 0,
        rdr      => "",
        wtr      => "",
        err      => "",
    };
    my @mq;

    my $cmd = $confInner{minfo};
    $h->{hpid} = open3( $h->{wtr}, $h->{rdr}, $h->{err}, $cmd )
      or confess "Unable to popen() h: $!\n";

    my $handle = $h->{rdr};

    my $out = $h->{wtr};

    my %stats;

    while (<$handle>) {
        my $r = $_;
        chomp $r;
        if ( $r =~ /^req:/ ) {
            my $res = handle_query( $gdb, $vp, $r, \%stats );
            if ( defined $res ) {
                print $out "$res\n";
            }

            # Some things *do* fail here, symbol lookups
            # and we don't need to report it.
            if ( $res eq "fail" ) {
                debug( $vp, "Failed dll request $r\n" );
            }
        } else {
            push( @mq, $r );
        }
    }

    my $sc = keys(%stats);

    waitpid( $h->{hpid}, 0 );
    close( $h->{rdr} );
    close( $h->{wtr} );
    close( $h->{err} );

    if ( $sc == 0 ) {

        # No interaction was had with minfo, abort with nothing.
        target_error( $vp, "Error running $confInner{minfo}: No contact" );
        return undef;
    }

    if ( $? ne 0 ) {

        # Bad exit code but we did talk to it so run with what we have.
        target_error( $vp,
            "Error running $confInner{minfo}: Bad exit code $?" );
    }

    return @mq;
}

# Send a CONT signal to a pid, there have been problems where a program
# is in "T" state which causes the attach to hang forever.  Send the
# process a signal before attaching to wake it up in case this is the case.
# gdb crashing (yes it does happen) is a common case for processes to be
# stopped so always deliver this signal before and after attaching.
sub send_cont_signal {
    my $pid = shift;
    kill( "CONT", $pid );
}

sub fetch_mpi_queue {
    my ( $carg, $vp, $pid ) = @_;
    my $g = gdb_start();
    my $p = gdb_attach( $g, $pid );
    if ( !$p ) {
        if ( defined $g->{error} ) {
            target_error( $vp, $g->{error} );
        } else {
            target_error( $vp, "Failed to attach to process" );
        }
        return;
    }

    my $base = gdb_var_addr( $g, "MPIR_dll_name" );
    if ( !defined $base ) {
        target_error( $vp,
            "Process does not appear to be using MPI (No MPIR_dll_name symbol)"
        );
    }

    if ( defined $carg->{mpi_dll} ) {
        $ENV{MPINFO_DLL} = $carg->{mpi_dll};
    } else {
        if ( !defined $base ) {
            gdb_detach($g);
            gdb_quit($g);
            return;
        }
    }

    my @mq = launch_h( $g, $vp );
    gdb_detach($g);
    gdb_quit($g);
    return @mq;
}

# As above but take a gdb handle
sub fetch_mpi_queue_gdb {
    my ( $carg, $vp, $pid, $g ) = @_;

    my $base = gdb_var_addr( $g, "MPIR_dll_name" );
    if ( !defined $base ) {
        target_error( $vp,
            "Process does not appear to be using MPI (No MPIR_dll_name symbol)"
        );
    }

    if ( defined $carg->{mpi_dll} ) {
        $ENV{MPINFO_DLL} = $carg->{mpi_dll};
    } else {
        if ( !defined $base ) {
            return;
        }
    }

    my @mq = launch_h( $g, $vp );
    return @mq;
}

sub show_mpi_queue {
    my ( $carg, $vp, $pid ) = @_;

    my @mq = fetch_mpi_queue( $carg, $vp, $pid );
    return unless $mq[0];
    foreach my $o (@mq) {
        output( $vp, $o );
    }
}

sub show_mpi_queue_all {
    my ( $carg, $list ) = @_;

    my @all;

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        debug $vp, "Attaching to $pid";
        my $gdb = gdb_start();
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
            push( @all, $proc );
        } else {
            if ( defined $gdb->{error} ) {
                target_error( $vp, $gdb->{error} );
            } else {
                target_error( $vp, "Failed to attach to process" );
            }
        }

    }

    foreach my $proc (@all) {

        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        my $gdb = $proc->{gdb};

        my @mq = fetch_mpi_queue_gdb( $carg, $vp, $pid, $gdb );
        if ( $mq[0] ) {
            foreach my $o (@mq) {
                output( $vp, $o );
            }
        }
    }

    foreach my $proc (@all) {
        my $gdb = $proc->{gdb};
        gdb_detach($gdb);
        gdb_quit($gdb);
    }
    return;
}

# Ideally handle all this at a higher level...
sub show_mpi_queue_for_deadlock_all {
    my ( $carg, $list ) = @_;

    my $ret;
    my @all;

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        debug $vp, "Attaching to $pid";
        my $gdb = gdb_start();
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
            push( @all, $proc );
        } else {
            output $vp, "Failed to attach to to process";
        }

    }

    foreach my $proc (@all) {
        my $tries = 0;

        my @threads;

        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        my $gdb = $proc->{gdb};

        my @mq = fetch_mpi_queue_gdb( $carg, $vp, $pid, $gdb );
        $ret->{$vp} = \@mq;
    }

    foreach my $proc (@all) {
        my $gdb = $proc->{gdb};
        gdb_detach($gdb);
        gdb_quit($gdb);
    }
    return $ret;
}

sub go_deadlock_detect {
    my ( $carg, $cd ) = @_;

    my %ad;

    my %tg;

    if ( $#target_groups != -1 ) {
        foreach my $gid (@target_groups) {
            $tg{$gid}++;
        }
    }

    foreach my $process ( keys( %{$cd} ) ) {
        my $rd = $cd->{$process};
        foreach my $g ( keys( %{$rd} ) ) {
            my $gd  = $rd->{$g};
            my $gid = $gd->{id};

            if ( $gd->{size} == 1 ) {
                $gid = "$gd->{id}($process)";
            }
            if ( defined $gd->{ranks}{0} ) {
                $gid = "$gd->{id}($gd->{ranks}{0})";
            }

            if ( $#target_groups != -1 ) {
                next unless defined $tg{$gid};
            }

            if ( $gd->{size} > 0 ) {
                $ad{$gid}{map}[ $gd->{rank} ] = $process;
            }
            $ad{$gid}{size} = $gd->{size};
            $ad{$gid}{name} = $gd->{name};
            foreach my $coll ( keys( %{ $gd->{coll} } ) ) {
                my $count = $gd->{coll}{$coll}{count};
                if ( defined $gd->{coll}{$coll}{active} ) {
                    $ad{$gid}{active}{$coll}++;
                    $ad{$gid}{idents}{ $gd->{rank} }{'active'}{$coll} = $count;
                } else {
                    $ad{$gid}{idents}{ $gd->{rank} }{'inactive'}{$coll} =
                      $count;
                }
            }
        }
    }

    my $ret     = "";
    my $i_count = 0;    # Interesting groups.
                        #foreach my $gid ( sort { $a <=> $b } keys %ad ) {

    foreach my $gid ( sort keys %ad ) {

        if ( $#target_groups != -1 ) {
            next unless defined $tg{$gid};
        }

        my $gstr = "Information for group '$gid' ($ad{$gid}{name})\n";

        # Maybe show the group members, hope that the user doesn't turn
        # this on unless also setting target_groups!
        if ( $carg->{show_group_members} ) {
            $gstr .= "group has $ad{$gid}{size} members\n";
            if ( defined $ad{$gid}{size} ) {
                for ( my $ident = 0 ; $ident < $ad{$gid}{size} ; $ident++ ) {
                    $gstr .=
                      "group member[$ident] => grank[$ad{$gid}{map}[$ident]]\n";
                }
            }
        }

        if ( $ad{$gid}{'active'} ) {
            $i_count++;

            # For all collective calls which we are interested in
            foreach my $s ( keys %{ $ad{$gid}{'active'} } ) {
                my %active;
                my %inactive;

                foreach my $ident ( keys %{ $ad{$gid}{'idents'} } ) {
                    if ( defined $ad{$gid}{'idents'}{$ident}{'active'}
                        and $ad{$gid}{'idents'}{$ident}{'active'}{$s} )
                    {
                        my $number = $ad{$gid}{'idents'}{$ident}{'active'}{$s};
                        push( @{ $active{$number} }, $ident );
                    } elsif ( $ad{$gid}{'idents'}{$ident}{'inactive'}{$s} ) {
                        my $number =
                          $ad{$gid}{'idents'}{$ident}{'inactive'}{$s};
                        push( @{ $inactive{$number} }, $ident );
                    }
                }
                foreach my $number ( sort ( keys %active ) ) {
                    $ret .= $gstr
                      . group_status_helper( "in call $number to $s",
                        0, $ad{$gid}{size}, @{ $active{$number} } );
                    $gstr = "";

                }
                foreach my $number ( sort ( keys %inactive ) ) {
                    $ret .= group_status_helper( "completed call $number to $s",
                        1, $ad{$gid}{size}, @{ $inactive{$number} } );
                }
            }
        } else {
            next unless ( $carg->{show_all_groups} );
            $ret .= $gstr;
            $gstr = "";
        }

        {
            my @inactive;
            foreach my $ident ( sort keys %{ $ad{$gid}{'idents'} } ) {
                if ( not defined $ad{$gid}{'idents'}{$ident}{'active'} ) {
                    push( @inactive, $ident );
                }
            }
            if ( $#inactive != -1 ) {
                $ret .= $gstr
                  . group_status_helper( "not in a call to the collectives",
                    0, $ad{$gid}{size}, @inactive );
                $gstr = "";
            }
        }
    }

    my $count = keys(%ad);

    if ( $count == 1 ) {
        my $use_str = ( $i_count == 1 ) ? "" : " not";
        $ret .= "Total: $count group which is$use_str in use.\n";
    } else {
        my $i_str = ( $i_count == 1 ) ? "is" : "are";
        $ret .= "Total: $count groups of which $i_count $i_str in use.\n";
    }

    return "$ret";
}

sub deadlock_detect {
    my ( $carg, $lines ) = @_;
    my $data;

    # XXX This is a bit of a hack to make the deadlock
    # code work with input files, the whole thing is due
    # a tidy-up on the full-duplex branch where this should
    # be solved properly.
    if ( defined $lines->{target_responce} ) {
        $data = $lines->{target_responce};
    } else {
        $data = $lines->{lines};
    }

    my %coll_data;
    foreach my $rank ( keys( %{$data} ) ) {
        my $r = $data->{$rank};
        my %lid;
        foreach my $line ( @{$r} ) {
            if ( $line =~ /^comm(\d+): (\w+): \'(.*)\'$/ ) {
                $lid{$1}{$2} = $3;
            } elsif ( $line =~ /^comm(\d+): Rank: local (\d+) global (\d+)$/ ) {
                $lid{$1}{ranks}{$2} = $3;
            } elsif ( $line =~
/^comm(\d+): Collective \'(\w+)\': call count (\d+), ([not ]*)active$/
              )
            {
                $lid{$1}{coll}{$2}{count} = $3;
                if ( $4 eq "" ) {
                    $lid{$1}{coll}{$2}{active} = 1;
                }
            } elsif ( $line =~ /^msg\d+/ ) {
                ;    # nop
            } else {
                print("Failed to match minfo output: $line\n");
            }
        }
        $coll_data{$rank} = \%lid;
    }

    my $r = go_deadlock_detect( $carg, \%coll_data );
    print $r;
}

sub gdb_read_value {
    my ( $gdb, $name ) = @_;

    # Quote the request in case it contains spaces.
    my %t = gdb_n_send( $gdb, "-data-evaluate-expression \"$name\"" );
    if ( $t{status} eq "done" ) {
        my $v = gdb_parse_reason( $t{reason} );
        return $v->{value};
    }
    return;
}

sub gdb_expand_vars {
    my ( $gdb, $frame, $type ) = @_;

    foreach my $arg ( @{ $frame->{$type} } ) {

        # Detect simple pointers and deferefence then to show the underlying
        # struct.  Works quite well but is a problem with very large or complex
        # data structures.  More work is required to make this feature viable so
        # leave it disabled for now.  Perhaps have an option for enabling it
        # in a per-type basis?

        if ( ( $arg->{type} =~ m/ \*$/ ) and $arg->{value} ne "0x0" and 0 ) {
            my $value = gdb_read_value( $gdb, "* $arg->{name}" );
            if ( defined $value ) {
                $arg->{value} .= " ($value)";
            }
        }

        # Some variables don't show up a value from list-locals,
        # __FUNCION__ and array pointers are two examples.  For
        # vars where the value isn't given automatically read
        # the value of them directly.
        next if defined $arg->{value};
        my $value = gdb_read_value( $gdb, $arg->{name} );
        if ( defined $value ) {
            $arg->{value} = $value;
        }
    }
}

sub gdb_dump_frames {
    my ( $gdb, $detail ) = @_;
    my %result = gdb_n_send( $gdb, "-stack-list-frames" );
    my $data = gdb_parse_reason( $result{reason}, "frame" );
    if ( not defined $data->{stack} ) {
        return ( { error => $data->{msg} || "unknown error" } );
    }
    if ( defined $detail ) {
        foreach my $frame ( @{ $data->{stack} } ) {
            gdb_send( $gdb, "-stack-select-frame $frame->{level}" );

            my %r = gdb_n_send( $gdb,
                "-stack-list-arguments 2 $frame->{level} $frame->{level}" );
            my $args = gdb_parse_reason( $r{reason}, "name" );

            if ( defined $args->{"stack-args"}[0]{frame}{args} ) {
                my @names = @{ $args->{"stack-args"}[0]{frame}{args} };
                @{ $frame->{params} } = @names;

                gdb_expand_vars( $gdb, $frame, "params" );
            }

            my %s = gdb_n_send( $gdb, "-stack-list-locals --simple-values" );
            if ( $s{status} eq "done" ) {
                my $args = gdb_parse_reason( $s{reason}, "name" );
                if ( defined $args->{locals} ) {
                    $frame->{locals} = $args->{locals};
                    gdb_expand_vars( $gdb, $frame, "locals" );
                }
            }
        }
    }
    return @{ $data->{stack} };
}

sub gdb_dump_frames_per_thread {
    my ( $gdb, $detail ) = @_;
    my @th = ();
    my %result = gdb_n_send( $gdb, "-thread-list-ids" );
    if ( $result{status} ne "done" ) {
        return ("unknown error");
    }
    my $data = gdb_parse_reason( $result{reason}, "thread-ids" );
    if ( not defined $data->{"thread-ids"} ) {
        return ( { error => $data->{msg} || "unknown error" } );
    }
    if ( $data->{"number-of-threads"} == 0 ) {
        my %t;
        $t{id} = 0;
        @{ $t{frames} } = gdb_dump_frames( $gdb, $detail );
        push( @th, \%t );
        return @th;
    }
    foreach my $thread ( @{ $data->{"thread-ids"} } ) {
        my $id = $thread->{"thread-id"};
        my %t;
        $t{id} = $id;
        gdb_send( $gdb, "-thread-select $id" );
        @{ $t{frames} } = gdb_dump_frames( $gdb, $detail );
        push( @th, \%t );
    }
    return @th;
}

# I'm not sure what this is trying to do.
# sub gdb_try_args {
#    my ( $gdb, @frames ) = @_;
#    my %result = gdb_n_send( $gdb, "-stack-list-arguments 0" );
#
#    my $result = $result{reason};
#
#    my ($stack) = ( $result =~ /stack=\[(.+)\]/ );
#    for ( ( $stack =~ /frame=\{([^\}]+)\}/g ) ) {
#        my %d = ();
#        s/\"//g;    #"
#        map { $d{ $$_[0] } = $$_[1] } map { [ split /=/ ] } split(/,/);
#
#        push( @frames, \%d );
#    }
#
#    return @frames;
#}

sub gdb_next_result {
    my ($gdb) = shift;
    my $handle = $gdb->{rdr};

    while (<$handle>) {
        return "" if /^\(gdb\)/;
        return $_ if /^\^(done|error)/;
    }
    return "";
}

sub stack_from_core {
    my $exe  = shift;
    my $core = shift;

    my $gdb = gdb_start( $exe, $core );

    my %e = gdb_n_next_result($gdb);

    my $r = $e{raw};

    $r =~ s/\\n/\n/g;
    $r =~ s/\\"/\"/g;    #"
    $r =~ s/\\\\/\\/g;

    my @r = split( "\n", $r );

    foreach my $l ( split( "\n", $r ) ) {
        next if ( $l =~ m/^done/ );
        next if ( $l =~ m/^Loaded/ );
        next if ( $l =~ m/^Reading/ );
        next if ( $l =~ m/^Using/ );
        print "$l\n";
    }

    # Send a invalid command so the wait_for_prompt in dump_frames... can work.
    # Should probably do this in gdb_start() and return the output somehow.
    my $handle = $gdb->{wtr};
    print $handle "\n";

    print "\n";

    my @threads;
    if ( $conf{stack_shows_params} ) {
        @threads = gdb_dump_frames_per_thread( $gdb, 1 );
    } else {
        @threads = gdb_dump_frames_per_thread($gdb);
    }

  # This code is (almost 100%) lifted from stack_trace_from_pids, could probably
  # factor it out into it's own helper function.
    foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
        my @frames = @{ $thread->{frames} };

        printf("ThreadId: $thread->{id}\n") if ( $#threads != 0 );

        for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
            my $frame = $frames[$i];

            printf("ERROR: $$frame{error}\n")
              if exists $$frame{error};

            next unless exists $$frame{level};
            next unless exists $$frame{func};

            if ( $conf{stack_shows_params} ) {
                my @a;
                foreach my $arg ( @{ $frame->{params} } ) {
                    if ( defined $frame->{vals}{$arg} ) {
                        push( @a, "$arg = $frame->{vals}{$arg}" );
                    } else {
                        push( @a, "$arg = ??" );
                    }
                }
                my $a = join( ", ", @a );
                my $file = $frame->{file} || "?";
                my $line = $frame->{line} || "?";
                printf("$frame->{func}($a) at $file:$line\n");

                if ( $conf{stack_shows_locals} ) {
                    foreach my $arg ( @{ $frame->{locals} } ) {
                        if ( defined $frame->{vals}{$arg} ) {
                            printf("  $arg = $frame->{vals}{$arg}\n");
                        } else {
                            printf("  $arg = ??\n");
                        }
                    }
                }
            } else {
                printf( ( $$frame{func} || "?" ) 
                    . "() at "
                      . ( $$frame{file} || "?" ) . ":"
                      . ( $$frame{line} || "?" )
                      . "\n" );
            }
        }
    }

    gdb_quit($gdb);
}

sub run_ptrack_cmd {
    my ( $vp, $pid, $cmd ) = @_;

    debug $vp, "running (p) $cmd";
    my $lines = 0;

    send_cont_signal($pid);
    open( CMD, "$cmd 2>/dev/null|" )
      || p_die( $vp, "cant start command $cmd" );
    while (<CMD>) {
        chomp $_;
        output $vp, $_;
        $lines++;
    }
    send_cont_signal($pid);
    close CMD;
    return $lines;
}

sub run_command {
    my ( $vp, $cmd ) = @_;
    debug $vp, "running $cmd";
    open( CMDS, "$cmd|" ) || p_die $vp, "cant fork subcommand";
    while (<CMDS>) {
        chomp $_;
        output $vp, $_;
    }
    close CMDS;
    debug $vp, "Finished $cmd";
}

sub get_remote_env {
    my $pid = shift;

    my %env;

    local $/ = "\0";
    open( FD, "/proc/$pid/environ" ) or return undef;
    while (<FD>) {
        chomp;
        my @f   = split "=";
        my $key = $f[0];
        shift @f;
        $env{$key} = join( "=", @f );
    }
    close FD;
    return %env;
}

# Load the data about a given RMS job id,
# return a array of hashes
sub load_rms_procs {
    my $jobId = shift;

    if ( not open PIDFILE, "/proc/rms/programs/$jobId/pids" ) {

        # This is actually perfectly legitimate, it's because you
        # can do for example allocate -N4 prun -N2 <app>.  Because
        # of the way prun -T works (across a resource) not having
        # a pids file isn't always a bad thing.
        #
        # Of course it could mean that whatever jobs were supposed
        # to be running on this node aren't.
        debug undef, "Cannot open /proc/rms/programs/$jobId/pids";
        return;
    }

    my @procs;

    while (<PIDFILE>) {
        my ( $pid, $vp ) = split(' ');
        my %process;
        $process{pid} = $pid;
        if ( defined $vp and $vp != -1 ) {

            # Modern versions of RMS do the pid to vp translation for
            # us but report all unknown pids as -1.  Unknown in this
            # case means the rmsloaders and any processes which haven't
            # called elan_baseInit()
            $process{vp} = $vp;
        }
        push @procs, \%process;
    }
    close(PIDFILE);
    return @procs;
}

sub show_task_file {
    my ( $vp, $file, $prefix ) = @_;
    return unless ( -f $file );
    open( FD, "$file" ) or return;
    my @all = <FD>;
    close FD;
    foreach my $l (@all) {
        chomp $l;
        if ( defined $prefix ) {
            proc_output( $vp, $prefix, $l );
        } else {
            my ( $key, $value ) = split( ":", $l );
            $value =~ s/^[ \t]+//g;
            proc_output( $vp, $key, $value );
        }
    }
}

sub show_task_stat_file {
    my ( $vp, $file ) = @_;
    my @stat_names =
      qw(pid comm state ppid pgrp session tty_nr tpgid flags minflt
      cminflt majflt cmajflt utime stime cutime cstime priority nice
      num_threads itrealvalue starttime vsize rss rlim startcode endcode
      startstack kstkesp kstkeip signal blocked sigignore sigcatch wchan
      nswap cnswap exit_signal processor rt_ptiority policy
      delayacct_blkio_ticks guest_time cguest_time);
    return unless ( -f $file );
    open( FD, "$file" ) or return;
    my @all = <FD>;
    close FD;

    foreach my $l (@all) {
        chomp $l;
        my @stats = split( / /, $l );
        for ( my $i = 0 ; $i <= $#stats ; $i++ ) {
            proc_output( $vp, "stat.$stat_names[$i]", $stats[$i] );
        }

    }
}

sub show_task_dir {
    my ( $carg, $vp, $pid, $dir ) = @_;

    if ( $carg->{proc_shows_proc} ) {
        my $exe = readlink "$dir/exe";
        if ( defined $exe ) {
            proc_output( $vp, "exe", $exe );
        }

        show_task_file( $vp, "$dir/status" );
        show_task_file( $vp, "$dir/wchan", "wchan" );
        show_task_file( $vp, "$dir/stat", "stat" );
        if ( $carg->{proc_shows_stat} ) {
            show_task_stat_file( $vp, "$dir/stat" );
        }

        if ( -f "$dir/maps" ) {
            open( MAP, "$dir/maps" );
            my @map = (<MAP>);
            close(MAP);
            my %totals;
            foreach my $rgn (@map) {
                my ( $area, $perm, $offset, $time, $inode, $file ) =
                  split( " ", $rgn );
                if ( $file =~ '/dev/elan4/sdram(\d+)' ) {
                    my $rail = $1;
                    my ( $start, $end ) = split( "-", $area );
                    my $s     = _hex("0x$start");
                    my $e     = _hex("0x$end");
                    my $delta = $e - $s;
                    if ( defined $totals{$rail} ) {
                        $totals{$rail} += $delta;
                    } else {
                        $totals{$rail} = $delta;
                    }
                }
            }
            foreach my $rail ( sort keys %totals ) {
                my $total = $totals{$rail} / 1024;
                proc_output( $vp, "sdram$rail", "$total kb" );
            }
        }
    }

    if ( $carg->{proc_shows_fds} ) {
        opendir( FDS, "$dir/fd" );
        my @fds = readdir(FDS);
        closedir(FDS);
        my @all_fddata;
        foreach my $fd (@fds) {
            next if ( $fd eq "." );
            next if ( $fd eq ".." );
            my $target = readlink "$dir/fd/$fd";

            my %fdhash;
            $fdhash{target} = $target;
            $fdhash{fd}     = $fd;

            # New fdinfo data, it's verbose so only enable it
            # if requested by -O proc-shows-fds=full
            if ( $carg->{proc_shows_fds} eq "full" ) {
                if ( -f "$dir/fdinfo/$fd" ) {
                    open( FDI, "$dir/fdinfo/$fd" );
                    my @fdi = (<FDI>);
                    close FDI;
                    foreach my $fdi (@fdi) {
                        chomp($fdi);
                        my ( $key, $value ) = split( ":", $fdi );
                        $value =~ s/\t//g;
                        $fdhash{$key} = $value;
                    }
                }
            }
            push( @all_fddata, \%fdhash );
        }
        foreach my $fd (@all_fddata) {
            if ( defined $fd->{pos} ) {
                proc_output( $vp, "fd$fd->{fd}",
                    "$fd->{target} \($fd->{pos} $fd->{flags}\)" );
            } else {
                proc_output( $vp, "fd$fd->{fd}", $fd->{target} );
            }
        }
    }
    if ( $carg->{proc_shows_maps} ) {
        show_task_file( $vp, "$dir/maps", "maps" );
    }
}

# Convert the first line of /proc/stat to elapsed jiffies.
sub string_to_jiffies {
    my ($ps) = @_;

    my @usecc = split( " ", $ps );

    my $jiffies = 0;

    # Remove the "cpu" prefix.
    shift(@usecc);
    foreach my $usecv (@usecc) {
        $jiffies += $usecv;
    }
    return $jiffies;
}

sub add_and_divide_jiffies {
    my ( $pre, $post ) = @_;

    my $jiffies;

    my @pre = split( " ", $pre );

    return ( ( string_to_jiffies($pre) + string_to_jiffies($post) ) / 2 );
}

sub pcpu_user {
    my ( $cpucount, $elapsed, $start, $end ) = @_;
    my @pre  = split( " ", $start );
    my @post = split( " ", $end );
    my $jused = $post[13] - $pre[13];
    my $used  = ( $jused / $elapsed ) * $cpucount * 100;
    return sprintf( "%d", $used );
}

sub pcpu_sys {
    my ( $cpucount, $elapsed, $start, $end ) = @_;
    my @pre  = split( " ", $start );
    my @post = split( " ", $end );
    my $jused = $post[14] - $pre[14];
    my $used  = ( $jused / $elapsed ) * $cpucount * 100;
    return sprintf( "%d", $used );
}

sub pcpu_total {
    my ( $cpucount, $elapsed, $start, $end ) = @_;
    my @pre  = split( " ", $start );
    my @post = split( " ", $end );
    my $jused = $post[13] - $pre[13] + $post[14] - $pre[14];
    my $used  = ( $jused / $elapsed ) * $cpucount * 100;
    return sprintf( "%d", $used );
}

my %proc_keys;

sub proc_output {
    my ( $vp, $key, $value ) = @_;
    if ( $confInner{mode} eq "proc_summary" ) {
        if ( defined $proc_keys{ lc($key) } ) {
            target_key_pair( $vp, lc($key), $value );
        }
    } else {
        output( $vp, "$key: $value" );
    }
}

sub show_proc_all {
    my ( $carg, $list ) = @_;

    %proc_keys = ();

    if ( defined $carg->{proc_format} ) {
        my @columns = split( ",", $carg->{proc_format} );
        foreach my $column (@columns) {
            my ( $name, $desc ) = split( "=", $column );
            $proc_keys{ lc($name) } = 1;
        }
    }

    my @all;

    my $jiffies_start;
    my $load_avg;
    if ( $carg->{proc_shows_proc} ) {
        foreach my $proc ( @{$list} ) {
            my $pid = $proc->{pid};
            open( $proc->{handle}, "/proc/$pid/stat" );
        }

        open( SFD, "/proc/stat\n" );

        # Begin critical path.
        my $stat = <SFD>;

        foreach my $proc ( @{$list} ) {
            my $pid = $proc->{pid};
            my $h   = $proc->{handle};
            $proc->{stat_start} = <$h>;
            seek( $proc->{handle}, 0, 0 );
        }

        seek( SFD, 0, 0 );
        my $stat2 = <SFD>;

        # End critical path.

        $jiffies_start = add_and_divide_jiffies( $stat, $stat2 );
        open( LFD, "/proc/loadavg" );
        $load_avg = <LFD>;
        close LFD;
    }

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        show_proc( $carg, $vp, $pid );
    }

    if ( $carg->{proc_shows_proc} ) {
        sleep(1);

        seek( SFD, 0, 0 );

        # Begin critical path.
        my $stat = <SFD>;

        foreach my $proc ( @{$list} ) {
            my $pid = $proc->{pid};
            my $h   = $proc->{handle};
            $proc->{stat_end} = <$h>;
            close( $proc->{handle} );
        }

        seek( SFD, 0, 0 );
        my $stat2 = <SFD>;

        # End critical path.

        my $cpucount = 0;
        while (<SFD>) {
            if ( $_ =~ /^cpu\d/ ) {
                $cpucount++;
            }
        }
        close(SFD);

        my $jiffies_end = add_and_divide_jiffies( $stat, $stat2 );

        my $elapsed = $jiffies_end - $jiffies_start;

        my ( $l1, $l5, $l15 ) = split( " ", $load_avg );

        foreach my $proc ( @{$list} ) {
            my $vp = $proc->{vp};

            proc_output(
                $vp, "pcpu",
                pcpu_total(
                    $cpucount,           $elapsed,
                    $proc->{stat_start}, $proc->{stat_end}
                )
            );
            proc_output(
                $vp, "pucpu",
                pcpu_user(
                    $cpucount,           $elapsed,
                    $proc->{stat_start}, $proc->{stat_end}
                )
            );
            proc_output(
                $vp, "pscpu",
                pcpu_sys(
                    $cpucount,           $elapsed,
                    $proc->{stat_start}, $proc->{stat_end}
                )
            );
            proc_output( $vp, "load1",  $l1 );
            proc_output( $vp, "load5",  $l15 );
            proc_output( $vp, "load15", $l15 );
        }
    }

    return;
}

sub show_proc {
    my ( $carg, $vp, $pid ) = @_;

    if ( $carg->{proc_shows_proc} ) {
        proc_output( $vp, "hostname", $confInner{hostname} );
    }

    if ( -d "/proc/$pid/task" and $carg->{proc_shows_proc} ) {

        my $threads = 0;

        # 2.6 kernel. (ntpl)
        opendir( DIR, "/proc/$pid/task" );
        my @tasks = readdir(DIR);
        closedir(DIR);
        foreach my $task (@tasks) {
            next if ( $task eq "." );
            next if ( $task eq ".." );
            show_task_dir( $carg, $vp, $pid, "/proc/$pid/task/$task" );
            $threads++;
        }
        proc_output( $vp, "threads", $threads );
    } else {
        show_task_dir( $carg, $vp, $pid, "/proc/$pid" );
    }
}

sub show_vars {
    my ( $vp, $frame, $type ) = @_;
    my %l;
    $l{t} = 0;
    $l{n} = 0;
    return unless defined( $frame->{$type} );
    return if ( @{ $frame->{$type} } == 0 );
    foreach my $arg ( @{ $frame->{$type} } ) {
        $l{t} = length( $arg->{type} ) if ( length( $arg->{type} ) > $l{t} );
        $l{n} = length( $arg->{name} ) if ( length( $arg->{name} ) > $l{n} );
    }
    my $header = sprintf("  $type:");
    output( $vp, $header );
    foreach my $arg ( @{ $frame->{$type} } ) {
        my $value = ( defined $arg->{value} ? $arg->{value} : "??" );
        my $output =
          sprintf( "  %-$l{t}s %$l{n}s = $value", $arg->{type}, $arg->{name} );
        output( $vp, $output );
    }
}

# Try and be clever here, attach to each and every process on this node first,
# then go back and query them each in turn, should mean that some processes are
# not spinning whilst gdb is doing it's thing which will mean a quicker runtime
# but also that the resulting stack traces will have less artifacts because running
# processes bunch up behind the non-running ones.
sub stack_trace_from_pids {
    my ( $carg, $list ) = @_;

    my @all;

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        debug $vp, "Attaching to $pid";
        my $gdb = gdb_start();
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
            push( @all, $proc );
        } else {
            if ( defined $gdb->{error} ) {
                target_error( $vp, $gdb->{error} );
            } else {
                target_error( $vp, "Failed to attach to process" );
            }
        }

    }

    foreach my $proc (@all) {
        my $tries = 0;

        my @threads;

        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        my $ok;
        do {
            debug $vp, "try $tries";
            my $gdb;

            if ($tries) {
                debug $vp, "Re-attaching to $pid, $tries";
                send_cont_signal($pid);
                sleep(1);
                my $g = gdb_start();
                if ( gdb_attach( $g, $pid ) ) {
                    $gdb = $g;
                } else {
                    if ( defined $gdb->{error} ) {
                        target_error( $vp, $gdb->{error} );
                    } else {
                        target_error( $vp, "Failed to attach to process" );
                    }
                }
            } else {
                $gdb = $proc->{gdb};
            }

            $ok = 0;
            if ( defined $gdb ) {
                if (   $carg->{stack_shows_params}
                    or $carg->{stack_shows_locals} )
                {
                    @threads = gdb_dump_frames_per_thread( $gdb, 1 );
                } else {
                    @threads = gdb_dump_frames_per_thread($gdb);
                }
                gdb_detach($gdb);
                gdb_quit($gdb);
                if ( defined $threads[0]->{frames} ) {
                    my @frames = @{ $threads[0]->{frames} };

                    $ok = 1;
                    $ok = 0
                      unless ( defined $frames[$#frames]{func}
                        and $frames[$#frames]{func} eq "main" );
                } else {
                    $ok = 0;
                }
            }
            $tries++;
          } while ( ( $ok != 1 )
            and ( $tries < $carg->{gdb_retry_count} ) );

        if ( not defined $threads[0]{id} ) {
            output( $vp, "Could not extract stack trace from application" );
            return;
        }

        if ( defined $threads[0]{error} ) {
            output( $vp, $threads[0]{error} );
            return;
        }

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            next unless defined $thread->{frames};
            my @frames = @{ $thread->{frames} };

            output( $vp, "ThreadId: $thread->{id}" ) if ( $#threads != 0 );

            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];

                output( $vp, "ERROR: $$frame{error}" )
                  if exists $$frame{error};

                next unless exists $$frame{level};
                next unless exists $$frame{func};

                output( $vp,
                        ( $$frame{func} || "?" ) 
                      . "() at "
                      . ( $$frame{file} || "?" ) . ":"
                      . ( $$frame{line} || "?" ) );
                if ( $carg->{stack_shows_params} ) {
                    show_vars( $vp, $frame, "params" );
                }
                if ( $carg->{stack_shows_locals} ) {
                    show_vars( $vp, $frame, "locals" );
                }

            }
        }
    }
    return;
}

sub kill_proc {
    my ( $cargs, $vp, $pid ) = @_;
    my $signal = uc( $cargs->{signal} );
    kill( $signal, $pid );
    return;
}

sub ping_rank {
    my ( $cargs, $vp, $pid ) = @_;
    target_key_pair( $vp, "PING", "ACK" );
    output( $vp, "ACK" );
    return;
}

sub show_queue {
    my ( $carg, $vp, $pid ) = @_;

    # Nobble the LD_LIBRARY_PATH to give etrace the best chance of working.
    my %remote_env = get_remote_env($pid);

    if ( defined $remote_env{LD_LIBRARY_PATH} ) {
        $ENV{LD_LIBRARY_PATH} = "$remote_env{LD_LIBRARY_PATH}:$confInner{myld}";
    }

    my $lines = run_ptrack_cmd( $vp, $pid,
        "$confInner{edb} --queues --pid=$pid $confInner{edbopt}" );

    return if ( $lines != 0 );

    show_mpi_queue( $carg, $vp, $pid );
    return;
}

sub show_clever_full_stack {
    my ( $vp, $pid ) = @_;

    my $gdb = gdb_start();

    if ( gdb_attach( $gdb, $pid ) ) {
        my @threads = gdb_dump_frames_per_thread( $gdb, 1 );
        gdb_detach($gdb);
        gdb_quit($gdb);

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            my @frames = @{ $thread->{frames} };
            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];

                my @a;
                foreach my $arg ( @{ $frame->{params} } ) {
                    if ( defined $frame->{vals}{$arg} ) {
                        push( @a, "$arg = $frame->{vals}{$arg}" );
                    } else {
                        push( @a, "$arg = ??" );
                    }
                }
                my $a = join( ", ", @a );
                my $file = $frame->{file} || "?";
                my $line = $frame->{line} || "?";
                output( $vp, "$frame->{func}($a) at $file:$line" );

                my $show_locals = 0;
                if ($show_locals) {
                    foreach my $arg ( @{ $frame->{locals} } ) {
                        if ( defined $frame->{vals}{$arg} ) {
                            output( $vp, "  $arg = $frame->{vals}{$arg}" );
                        } else {
                            output( $vp, "  $arg = ??" );
                        }
                    }
                }
            }
        }
    }
}

sub show_full_stack {
    my ( $vp, $pid, $file ) = @_;
    run_ptrack_cmd( $vp, $pid, "gdb -batch -x $file -p $pid" );
}

sub show_full_stacks {
    my ( $carg, $list ) = @_;

    if (0) {

        # -x does this, just do what we used to.
        foreach my $proc ( @{$list} ) {
            show_clever_full_stack( $proc->{vp}, $proc->{pid} );
        }
        return;
    }

    my ( $fh, $file ) = tempfile("/tmp/padb.XXXXXXXX");
    print $fh "where full\n";
    print $fh "detach\n";
    close $fh;

    foreach my $proc ( @{$list} ) {
        show_full_stack( $proc->{vp}, $proc->{pid}, $file );
    }

    unlink($file);
    return;
}

sub set_debug {
    my ( $carg, $vp, $pid ) = @_;
    run_command( $vp,
        "edb --key $confInner{key} --debug=$carg->{dflag} --target-vp=$vp" );
}

my $mpi_watch_data = <<EOF;
Barrier,b,elan_gsync,elan_hgsync,PMPI_Barrier,MPI_Barrier,shmem_barrier
Broadcast,B,elan_hbcast,elan_bcast,PMPI_Bcast,MPI_Bcast,shmem_bcast
AllGather,G,PMPI_Allgather,MPI_ALLgather
Gather,g,elan_gather,PMPI_Gather,MPI_Gather
AllReduce,R,PMPI_Allreduce,MPI_Allreduce
Reduce,r,elan_reduce,PMPI_Reduce,MPI_Reduce
alltoall,a,elan_alltoall,PMPI_Alltoall,MPI_Alltoall
alltoalls,A,elan_alltoalls
wait,w,elan_wait
EOF

# Load a file for use in MPI_Watch.
sub mpi_watch_load {
    my ($carg) = @_;

    # File is a csv file,
    # Name,c,function1,function2

    if ( defined $carg->{mpi_watch_file} ) {
        my %fns;
        my $f = $carg->{mpi_watch_file};
        open( MW, $f ) or return;
        my @d = (<MW>);
        close(MW);

        foreach my $mode (@d) {
            chomp $mode;
            my ( $name, $char, @fns ) = split( ",", $mode );
            $fns{names}{$name} = $char;
            foreach my $fn (@fns) {
                $fns{fns}{$fn} = $name;
            }
        }
        return \%fns;
    }

    my %fns;
    foreach my $mode ( split( "\n", $mpi_watch_data ) ) {
        chomp $mode;
        my ( $name, $char, @fns ) = split( ",", $mode );
        $fns{names}{$name} = $char;
        foreach my $fn (@fns) {
            $fns{fns}{$fn} = $name;
        }
    }
    return \%fns;
}

# Legend:
#
# u - unexpected messages
# U - unexpected and other messages
# s - send messages only
# r - receive messages only
# m - send and receive messages
# . - no messages, consuming CPU
# - - sleeping
#
# * - error.

sub mpi_watch_all {

    my ( $carg, $list ) = @_;
    my $fns = mpi_watch_load($carg);

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        # Load the status now before we attach with GDB,
        # otherwise we'll just see it as "T" (Stopped).
        my $m = find_from_status( $pid, "State" );
        target_key_pair( $vp, "proc_state", $m );
        if ( $m eq "R" ) {
            $m = ",";
        } elsif ( $m eq "S" ) {
            $m = "-";
        } else {
            $m = "*";
        }
        $proc->{state} = $m;

        my $gdb = gdb_start();
        if ( gdb_attach( $gdb, $pid ) ) {
            $proc->{gdb} = $gdb;
        } else {
            if ( defined $gdb->{error} ) {
                target_error( $vp, $gdb->{error} );
            } else {
                target_error( $vp, "Failed to attach to process" );
            }
        }
    }

    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};

        if ( not defined $proc->{gdb} ) {
            target_key_pair( $vp, "state", $proc->{state} );
            next;
        }

        my $gdb = $proc->{gdb};

        my @mq;
        my $sm   = 0;
        my $rm   = 0;
        my $um   = 0;
        my $good = ".";
        my $fnmode;

        @mq = fetch_mpi_queue_gdb( $carg, $vp, $pid, $gdb );

        if ( $#mq == 0 ) {
            $good = ",";
        } else {
            foreach my $o (@mq) {
                if ( $o =~ /Operation (\d)/ ) {
                    my $type = $1;
                    $sm++ if ( $type == 0 );
                    $rm++ if ( $type == 1 );
                    $um++ if ( $type == 2 );
                }
            }
        }

        my $mt = ( grep { $_ } ( $sm, $rm, $um ) );
        if ( $mt != 0 ) {
            my $mode = "*";

            if ($um) {
                $mode = "u";
                $mode = "U" if ( $mt != 1 );
            } else {
                if ( $mt == 1 ) {
                    $mode = "s" if ($sm);
                    $mode = "r" if ($rm);
                } else {
                    $mode = "m";
                }
            }
            target_key_pair( $vp, "state", $mode );
            next;
        }

        my @threads = gdb_dump_frames_per_thread($gdb);

        foreach my $thread ( sort { $a->{id} <=> $b->{id} } @threads ) {
            my @frames = @{ $thread->{frames} };
            for ( my $i = $#frames ; $i >= 0 ; $i-- ) {
                my $frame = $frames[$i];
                if ( defined $fns->{fns}{ $frame->{func} } ) {
                    $fnmode = $fns->{fns}{ $frame->{func} };
                    last;
                }
            }
        }

        if ( defined $fnmode ) {
            target_key_pair( $vp, "state", $fns->{names}{$fnmode} );
            next;

        }

        # Fall through case.
        target_key_pair( $vp, "state", $proc->{state} );
    }

    foreach my $proc ( @{$list} ) {
        if ( $proc->{gdb} ) {
            gdb_detach( $proc->{gdb} );
            gdb_quit( $proc->{gdb} );
        }
    }

}

sub maybe_show_pid {
    my ( $vp, $pid ) = @_;

    debug( $vp, "maybe_show_pid vp $vp, pid: $pid" );

    $confInner{rmpids}{$pid}{rank} = $vp;
}

sub find_from_status {
    my $pid = shift;
    my $key = shift;

    open( PCMD, "/proc/$pid/status" ) or return;
    while (<PCMD>) {
        my $l = $_;
        if ( $l =~ /$key:\t+(\w+)/ ) {
            close PCMD;
            return $1;
        }
    }
    close PCMD;
    return;
}

sub is_resmgr_process {
    my $pid  = shift;
    my $name = find_from_status( $pid, "Name" );
    my $mgrs = { 'rmsloader' => 1, 'slurmd' => 1, 'slurmstepd' => 1 };
    return 1 if ( defined $mgrs->{$name} );
}

# Report the pids as reported by slurm, don't worry about tracing children or
# anything at this stage.
sub slurm_find_pids {
    my $jobid = shift;

    my @procs =
      `scontrol listpids $jobid.$confInner{slurm_job_step} 2>/dev/null`;
    return undef if ( $? != 0 );
    foreach my $proc (@procs) {
        my ( $pid, $job, $step, $local, $global ) = split( " ", $proc );
        next if ( $global eq "-" );
        next unless ( $job eq $jobid );
        next unless ( $step == $confInner{slurm_job_step} );
        maybe_show_pid( $global, $pid );
    }
}

# Local processes per node, i.e. no resource manager support, we only
# have one process in this case so call it process 0.
sub local_find_pids {
    my $pid = shift;

    maybe_show_pid( 0, $pid );
}

sub mpd_find_pids {
    my $job = shift;
    my $d   = mpd_get_data();

    my $j = $d->{$job}{pids}{ $confInner{hostname} };

    foreach my $pid ( keys %{$j} ) {
        maybe_show_pid( $j->{$pid}, $pid );
    }
}

sub open_find_pids {
    my $job = shift;

    my $hostname = $confInner{hostname};

    foreach my $rank ( keys( %{ $confInner{orte_data}{$hostname} } ) ) {
        maybe_show_pid( $rank, $confInner{orte_data}{$hostname}{$rank} );
    }
}

sub rms_find_pids {
    my $jobid = shift;

    my %vps;

    my @procs = load_rms_procs($jobid);

    foreach my $proc (@procs) {

        my $vp = $proc->{vp};

        # With any luck we have a new RMS and vp is extracted from /proc
        # Otherwise try and pick it out of the environment in a sane way
        # if that fails report errors for any process with isn't rmsloader

        if ( defined $vp ) {
            debug $vp, "Maybe looking at vp: $vp, pid: $proc->{pid}";
        } else {
            debug undef, "Maybe looking at pid: $proc->{pid}";
        }

        # Strip or rmsloader and slurm[step]d;
        next if ( is_resmgr_process( $proc->{pid} ) );

        # If we aren't known to be the vp and we are not a direct descendant
        # of the resource manager then skip over to the next process.
        next if ( not defined $vp and not is_parent_resmgr( $proc->{pid} ) );

        my $found = "actual";

        if ( not defined $vp ) {
            $found = "likely";

            my %env = get_remote_env( $proc->{pid} );

            if ( defined $env{RMS_PROCID} ) {
                $vp = $env{RMS_PROCID};
            } elsif ( defined $env{SLURM_PROCID} ) {
                $vp = $env{SLURM_PROCID};
            } else {

                debug( undef,
                    "Could not extract vp for process, $proc->{pid} "
                      . ( readlink "/proc/$proc->{pid}/exe" ) );
                next;
            }
        }

        debug( $vp, "Found $found vp $vp, pid: $proc->{pid}" );

        push( @{ $vps{$vp}{$found} }, $proc->{pid} );
    }

    foreach my $vp ( keys %vps ) {
        if ( defined $vps{$vp}{actual} ) {
            foreach my $pid ( @{ $vps{$vp}{actual} } ) {
                maybe_show_pid( $vp, $pid );
            }
        } else {
            foreach my $pid ( @{ $vps{$vp}{likely} } ) {
                maybe_show_pid( $vp, $pid );
            }
        }
    }

}

sub inner_show_stats {
    my $jobid = shift;
    my $key   = ( $jobid << 9 ) - 1;
    run_command( undef,
        "$confInner{edb} --stats-raw --parallel --key=$key $confInner{edbopt}"
    );
    return;
}

sub default_handler_all {
    my ( $cmd, $list ) = @_;
    my %gres;
    foreach my $proc ( @{$list} ) {
        my $vp  = $proc->{vp};
        my $pid = $proc->{pid};
        my $res = $allfns{ $cmd->{mode} }{handler}( $cmd->{cargs}, $vp, $pid );
        $gres{$vp} = $res if ( defined $res );
    }
    return if not %gres;
    return \%gres;
}

# Receive a reply from a child.
# If it's the last reply then combine
# with others and forward to parent.
sub reply_from_child {
    my ( $handle, $sd, $req ) = @_;

    # If it's the first connection over this socket simply
    # foreward on the signon command.
    if ( $req eq "Welcome" ) {
        $sd->{socket}->printf("$handle->{signon_cmd}\n");
        return;
    }

    my $r = my_decode($req);

    # Merge this reply into the local one.
    $handle->{child_replys}++;

    # Combine the host responces.
    foreach my $status ( keys( %{ $r->{host_responce} } ) ) {
        foreach my $host ( keys( %{ $r->{host_responce}{$status} } ) ) {
            $handle->{all_replys}->{host_responce}{$status}{$host} =
              $r->{host_responce}{$status}{$host};
        }
    }

    # Combine the target process responces.
    if ( exists $r->{target_responce} ) {
        foreach my $tp ( keys( %{ $r->{target_responce} } ) ) {
            $handle->{all_replys}->{target_responce}{$tp} =
              $r->{target_responce}{$tp};
        }
    }

    # Combine the target process responces from child.
    if ( exists $r->{target_output} ) {
        foreach my $tp ( keys( %{ $r->{target_output} } ) ) {
            $handle->{all_replys}->{target_output}{$tp} =
              $r->{target_output}{$tp};
        }
    }

    # Copy the target local responces.
    if ( exists $handle->{target_responce} ) {
        foreach my $tp ( keys( %{ $handle->{target_responce} } ) ) {
            $handle->{all_replys}->{target_responce}{$tp} =
              $handle->{target_responce}{$tp};
        }
    }

    # Save any output we've got from this node.
    foreach my $key ( keys(%inner_output) ) {
        $handle->{all_replys}->{target_output}{$key} = $inner_output{$key};
    }

    %inner_output = ();

    # Copy the network target errors into responce.
    if ( exists $r->{target_data} ) {
        if ( exists $handle->{all_replys}->{target_data} ) {
            foreach my $key ( keys( %{ $r->{target_data} } ) ) {
                foreach my $value ( keys( %{ $r->{target_data}{$key} } ) ) {
                    if (
                        defined $handle->{all_replys}
                        ->{target_data}{$key}{$value} )
                    {
                        rng_merge(
                            $handle->{all_replys}->{target_data}{$key}{$value},
                            $r->{target_data}{$key}{$value}
                        );
                    } else {
                        $handle->{all_replys}->{target_data}{$key}{$value} =
                          $r->{target_data}{$key}{$value};
                    }
                }
            }
        } else {
            $handle->{all_replys}->{target_data} = $r->{target_data};
        }
    }

    # Merge in local target responces.
    foreach my $key ( keys(%local_target_data) ) {
        foreach my $value ( keys( %{ $local_target_data{$key} } ) ) {
            if ( defined $handle->{all_replys}->{target_data}{$key}{$value} ) {
                rng_merge( $handle->{all_replys}->{target_data}{$key}{$value},
                    $local_target_data{$key}{$value} );
            } else {
                $handle->{all_replys}->{target_data}{$key}{$value} =
                  $local_target_data{$key}{$value};
            }
        }
    }

    %local_target_data = ();

    # If this isn't the last child to signon don't reply up-stream yet.
    if ( $handle->{child_replys} != $handle->{children} ) {
        return;
    }

    # Send the data upstream.
    my $reply = $handle->{all_replys};

    reply_to_parent( $handle, $reply );
    if ( $handle->{shutdown} ) {
        inner_cleanup_and_exit($handle);
    }

    # Reset local data.
    $handle->{all_replys}      = undef;
    $handle->{child_replys}    = 0;
    $handle->{target_responce} = undef;
}

# Convert from a pid to a command name and do it in a safe manner to avoid
# warnings.  suid programs tend to have the exe link which is un-readable
# so if that yeilds nothing then load the name from the status file.
sub pid_to_name {
    my $pid = shift;
    my $exe = readlink("/proc/$pid/exe");
    my $cmd;
    if ( defined $exe ) {
        return basename($exe);
    } else {
        return find_from_status( $pid, "Name" );
    }
}

# Take the resource manager list of pids and possibly convert these into
# more interesting pids, in particular look for pids which appear to be
# scripts and, if they have any children, look at the children instead.
sub convert_pids_to_child_pids {

    opendir( DIR, "/proc/" );
    my @pids = readdir(DIR);
    closedir(DIR);

    my $uid = $<;

    my %scripts;
    map { $scripts{$_}++ } split( ",", $confInner{scripts} );

    my $ipids = $confInner{rmpids};

    foreach my $pid (@pids) {

        # Ignore entries that aren't numeric.
        next unless ( $pid =~ /^\d+$/ );

        # Ignore processes with the wrong ownership.
        my ( undef, undef, undef, undef, $owner ) = stat("/proc/$pid");
        next unless $owner == $uid;

        # The resource manager pid this pid is associated with.
        my $rmpid;

        if ( defined $ipids->{$pid} ) {
            $rmpid = $pid;
        } else {
            my $ppid = find_from_status( $pid, "PPid" );

            while ( defined $ppid and $ppid != 1 ) {
                if ( defined $ipids->{$ppid} ) {
                    $rmpid = $ppid;
                    $ppid  = undef;
                } else {
                    $ppid = find_from_status( $ppid, "PPid" );
                }
            }
        }

        next unless defined $rmpid;

        if ( defined( $scripts{ pid_to_name($pid) } ) ) {
            push( @{ $ipids->{$rmpid}{scripts} }, $pid );
        } else {
            push( @{ $ipids->{$rmpid}{notscripts} }, $pid );
        }
    }

    # Now chose what pid to target.
    foreach my $key ( keys( %{$ipids} ) ) {
        my $ip = $ipids->{$key};

        my $newpid;

        if ( defined( $ip->{scripts} ) ) {
            my @ppids = sort( @{ $ip->{scripts} } );
            $newpid = $ppids[0];
        }

        # If there are any pids which aren't scripts then target the
        # first one.
        if ( defined( $ip->{notscripts} ) ) {
            my @ppids = sort( @{ $ip->{notscripts} } );
            $newpid = $ppids[0];
        }
        my %pd;
        $pd{pid} = $newpid;
        $pd{vp}  = $ip->{rank};
        push( @{ $confInner{all_pids} }, \%pd );

    }

}

# Find and report pids as part of the signon protocol, we should
# also report name
sub inner_find_pids {
    my ( $netdata, $cmd ) = @_;

    if ( $confInner{rmgr} eq "orte" ) {
        $confInner{orte_data} = $cmd->{orte_data};
    }

    # Query the resource manager to find the pids,
    # they'll be added to the "all_pids" array.
    $rmgr{ $confInner{rmgr} }{find_pids}( $confInner{jobid} );

    convert_pids_to_child_pids();

    foreach my $proc ( @{ $confInner{all_pids} } ) {
        my $pid   = $proc->{pid};
        my $vp    = $proc->{vp};
        my $name  = readlink("/proc/$pid/exe");
        my $state = find_from_status( $pid, "State" );
        target_key_pair( $vp, "found", "yes" );
        target_key_pair( $vp, "name",  $name );
        target_key_pair( $vp, "state", $state );
    }
}

# Receive a command (perl reference) from our parent.
#
# When we receive a command:
# 1) Send it on to our children.
# 2) Execute it.
# 3) If we have no children send reply.
sub command_from_parent {
    my ( $netdata, $cmd ) = @_;

    if ( $cmd->{mode} eq "signon" ) {
        $netdata->{signon_cmd} = my_encode($cmd);

        # Setup the environment.
        foreach my $key ( keys( %{ $cmd->{cinner} } ) ) {
            $confInner{$key} = $cmd->{cinner}{$key};
        }

        if (
            not
            exists $cmd->{connection_tree}{ $confInner{hostname} }{children} )
        {
            $netdata->{children} = 0;
            inner_find_pids( $netdata, $cmd );
            return;
        }

        my @children =
          @{ $cmd->{connection_tree}{ $confInner{hostname} }{children} };
        $netdata->{children} = $#children + 1;

        # Only one child is tested so far.
        foreach my $chostname (@children) {
            my $socket = connect_to_child(
                $chostname,
                $cmd->{remote}{$chostname}{port},
                $cmd->{remote}{$chostname}{key}
            );
            my %cdata;
            $cdata{socket}   = $socket;
            $cdata{hostname} = $chostname;
            $cdata{line_cb}  = \&reply_from_child;
            $cdata{state}    = "init";
            $netdata->{sel}->add($socket);
            $netdata->{connections}{$socket} = \%cdata;
            push @{ $netdata->{child_sockets} }, $socket;
        }
        inner_find_pids( $netdata, $cmd );
        return;
    }

    # Forward on to our children before doing any more processing.
    if ( $netdata->{children} ) {
        my $req = my_encode($cmd) . "\n";
        foreach my $child ( @{ $netdata->{child_sockets} } ) {
            $child->printf($req);
            $child->flush();
        }
    }

    if ( $cmd->{mode} eq "exit" ) {
        $netdata->{shutdown} = 1;
        return;
    }

    $confInner{mode} = $cmd->{mode};

    my $pid_list;

    # If supplied with a rank list then use it now to generate a list of
    # processes to inspect.
    if ( exists( $cmd->{ranks} ) ) {
        my $rng = rng_dup( $cmd->{ranks} );

        # Loop over ranks first as there are potentially more of them.
        while ( defined( my $rank = rng_shift($rng) ) ) {
            foreach my $proc ( @{ $confInner{all_pids} } ) {
                my $vp  = $proc->{vp};
                my $pid = $proc->{pid};
                if ( $vp == $rank ) {
                    push @{$pid_list}, $proc;
                }
            }
        }
    } else {
        $pid_list = $confInner{all_pids};
    }

    # Now do the work.
    my $res;
    if ( defined $allfns{ $cmd->{mode} }{handler_all} ) {
        $res = $allfns{ $cmd->{mode} }{handler_all}( $cmd->{cargs}, $pid_list );
    } else {
        $res = default_handler_all( $cmd, $pid_list );
    }
    if ($res) {
        $netdata->{target_responce} = $res;
    }

    return;
}

# Time for the inner process to exit, cleanup all sockets and
# quit.
sub inner_cleanup_and_exit {
    my $netdata = shift;
    foreach my $h ( $netdata->{sel}->handles() ) {
        $h->flush();
        $h->close();
    }
    exit(0);
}

# Send a reply to our parent, put a status of "ok" on for this
# host.
sub reply_to_parent {
    my ( $netdata, $cmd ) = @_;

    $cmd->{host_responce}{ok}{ $confInner{hostname} } = 1;

    my $reply = my_encode($cmd);
    $netdata->{parent}->{socket}->print("$reply\n");
}

# Process a single line of input onto a socket we are
# listening on.  This is probably our parent (who may
# be the outer process) but it needs to be authenticated.
sub command_from_outer {
    my ( $netdata, $cdata, $line ) = @_;

    my $s = $cdata->{socket};
    if ( not $cdata->{trusted} ) {
        if ( $line eq "hello $netdata->{key}" ) {

            $cdata->{trusted} = 1;
            $cdata->{str}     = "";
            $s->printf("Welcome\n");
            $netdata->{parent} = $cdata;
        } elsif ( $line eq "debug" ) {
            my $r = Dumper($netdata);
            $s->printf($r);
            $s->flush();
            $netdata->{sel}->remove($s);
            $s->close();
            $cdata->{dead} = 1;
            print("debug\n");
        } else {
            printf("Closing connection from $cdata->{desc} (Bad signon)\n");
            $netdata->{sel}->remove($s);
            $s->close();
            $cdata->{dead} = 1;
        }
        return;
    }

    command_from_parent( $netdata, my_decode($line) );

    if ( $netdata->{children} == 0 ) {
        my $res;
        if ( defined $netdata->{target_responce} ) {
            $res->{target_responce} = $netdata->{target_responce};
        }

        # Save any output we've got from this node.
        foreach my $key ( keys(%inner_output) ) {
            $res->{target_output}{$key} = $inner_output{$key};
        }

        if (%local_target_data) {
            $res->{target_data} = \%local_target_data;
        }

        reply_to_parent( $netdata, $res );

        # Clear down the local inputs.
        %inner_output               = ();
        %local_target_data          = ();
        $netdata->{target_responce} = undef;

        if ( $netdata->{shutdown} ) {
            inner_cleanup_and_exit($netdata);
        }
    }
}

# Loop forever in the inner process.
sub inner_loop_for_comms {
    my ($outerloc) = @_;

    my $server = create_local_port( $confInner{port_range} );

    my $lport    = $server->sockport();
    my $hostname = $confInner{hostname};
    my $key      = rand();

    if ( defined $outerloc ) {
        my ( $ohost, $oport ) = split( ":", $outerloc );
        my $os = IO::Socket::INET->new(
            PeerAddr => $ohost,
            PeerPort => $oport,
            Proto    => 'tcp',
        ) or confess("Failed to connect to outer");
        my $secret = find_padb_secret();
        die("No secret") if not defined $secret;
        $os->print("Hello $secret $hostname $lport $key\n");
        $os->close();
    } else {

    # For now just print the signon code to stdout and let the outer pick it up.
        my $signon_text = "connect $hostname $lport $key\n";
        print($signon_text);
    }

    my $netdata;
    $netdata->{sel} = IO::Select->new();
    $netdata->{sel}->add($server);
    $netdata->{server}   = $server;
    $netdata->{key}      = $key;
    $netdata->{shutdown} = 0;

    my $sel = $netdata->{sel};

    my $stime = time();

    while ( $sel->count() > 0 ) {
        while ( my @data = $sel->can_read(5) ) {
            foreach my $s (@data) {
                if ( $s == $server ) {
                    my $new = $server->accept() or confess("Failed accept");
                    $sel->add($new);
                    my $peer = getpeername($new);
                    my ( $port, $addr ) = unpack_sockaddr_in($peer);
                    my $ip = inet_ntoa($addr);
                    my $hostname = gethostbyaddr( $addr, AF_INET );

                    my %sinfo;
                    $sinfo{hostname}              = $hostname;
                    $sinfo{trusted}               = 0;
                    $sinfo{port}                  = $port;
                    $sinfo{desc}                  = "$hostname:$port";
                    $sinfo{socket}                = $new;
                    $sinfo{line_cb}               = \&command_from_outer;
                    $netdata->{connections}{$new} = \%sinfo;
                    next;
                }

                my $sinfo = $netdata->{connections}{$s};
                my $d;
                my $count = sysread( $s, $d, 65536 );

                # Dead connection.
                if ( not defined $d or $count eq 0 ) {

                    # printf("null read from $sinfo->{desc}\n");
                    if ( eof($s) ) {
                        $sel->remove($s);
                        $s->close();
                        $sinfo->{trusted} = 0;
                        $sinfo->{dead}    = 1;
                        my $scount = $sel->count();
                    }
                    next;
                }

                $sinfo->{str} .= $d;
                extract_line( $netdata, $sinfo );

            }
        }
        my $time = time();

       # Should probably handle this better, if the outer or tree never signons
       # for whatever reason silently die as it's probably the best thing do to.
        if ( ( $sel->count() == 1 ) and ( ( $time - $stime ) > 30 ) ) {
            exit(0);
        }
    }
    my $count = $sel->count();
    printf("Thats not supposed to happen count=($count)\n");

}

sub inner_main {

    $confInner{hostname} = hostname();

    # Load the inner config options, the defaults are the same
    # as the outer config options so just load them as they are
    # set normally.  Use the @inner_conf and @inner_conf_cmd
    # lists to decide which ones to copy.
    # If any of these options are set then the outer process
    # will forward on any changes as part of the setup procedure.
    foreach my $conf (@inner_conf) {
        $confInner{$conf} = $conf{$conf};
    }

    foreach my $conf (@inner_conf_cmd) {
        $confInner{$conf} = $conf{$conf};
    }

    # Over-ride the defaults for these two as minfo might not
    # exist on the front end.
    $confInner{edb}   = find_edb();
    $confInner{minfo} = find_minfo();

    # Load the command line options.
    my %optionhash;

    map { $optionhash{"$_=s"} = \$confInner{$_} } @inner_conf_cmd;

    GetOptions(%optionhash) or confess("could not parse options\n");

    $confInner{myld} = $ENV{LD_LIBRARY_PATH};

    inner_loop_for_comms( $confInner{outer} );
    exit(0);

}

###############################################################################
#
# Main.
#
###############################################################################

# Initialise (some of) the options which are common to both the
# inner and outer instances of padb.  Attempt to make it easy
# to add new options by keeping everything in one place.
#
# Additional work is needed to make this 100% consistent, some
# of these options have secondary options (e.g. --kill and --signal) and
# this isn't dealt with yet.
#
# stack_long has a special case later on which adds two extra handlers
# in the inner code, this could be replaced by prehandler and posthandler
# but it's the only code that needs it so far.

sub to_arg {
    my $arg = shift;
    my $res = "$arg->{arg_long}";
    $res =~ s/\_/-/g;
    if ( defined $arg->{arg_short} ) {
        $res .= "|$arg->{arg_short}";
    }
    if ( defined $arg->{type} ) {
        $res .= $arg->{type};
    }
    return $res;
}

sub common_main {

    # The quasi-authorative list of modes padb can operate in.

    # Notes on the callback functions and paramaters.

    # handler     Called in the inner for each target process.
    # param:      ??, $vp, $pid

# handler_all Called once in the the inner and should iterate over each target process.
#             ??, $vp, $pid

# These two functions can eitehr return a value, and have it passed to the output handler
# or call output() and use the default_output_handler().

    # out_handler Called once in the outer to display the output
    # pre_out_handler Called once in the outer to display any header.

    # TODO:
    # --mode=<mode> on the command line?
    # Sort out secondary and options_i so they are handled in the same way.

    $allfns{queue} = {
        'arg_long'  => "message-queue",
        'qsnet'     => 1,
        'arg_short' => "q",
        'handler'   => \&show_queue,
        'help'      => "Show the message queues",
        'options_i' => { "mpi_dll" => undef, }

    };

    $allfns{kill} = {
        'handler'   => \&kill_proc,
        'arg_long'  => 'kill',
        'help'      => "Deliver signal to processes",
        'secondary' => [
            {
                'arg_long' => 'signal',
                'type'     => '=s',
                'default'  => 'TERM'
            }
        ]
    };

    $allfns{mqueue} = {
        'handler_all' => \&show_mpi_queue_all,
        'arg_long'    => 'mpi-queue',
        'arg_short'   => 'Q',
        'help'        => "Show MPI message queues",
        'options_i'   => { "mpi_dll" => undef, }
    };

    $allfns{deadlock} = {
        'handler_all' => \&show_mpi_queue_for_deadlock_all,
        'arg_long'    => 'deadlock',
        'arg_short'   => 'j',
        'help'        => "Run deadlock detection algorithm",
        'out_handler' => \&deadlock_detect,
        'options_i'   => {
            "mpi_dll"            => undef,
            "show_group_members" => 0,
            "show_all_groups"    => 0,
          }

    };

    $allfns{pinfo} = {
        'handler_all' => \&show_proc_all,
        'arg_long'    => 'proc-info',
        'help'        => "Show process information",
        'options_i'   => {
            "proc_shows_proc" => 1,
            "proc_shows_fds"  => 1,
            "proc_shows_maps" => 0,
            "proc_shows_stat" => 0
          }

    };

    $allfns{proc_summary} = {
        'handler_all' => \&show_proc_all,
        'out_handler' => \&show_proc_format,
        'arg_long'    => 'proc-summary',
        'help'        => "Show process information in top format",
        'options_i'   => {
            "column_seperator"   => "  ",
            "proc_shows_proc"    => 1,
            "proc_shows_fds"     => 0,
            "proc_shows_maps"    => 0,
            "proc_shows_stat"    => 1,
            "proc_sort_key"      => "vp",
            "proc_show_header"   => 1,
            "reverse_sort_order" => 0,
            "nprocs_output"      => undef,
        },
        'secondary' => [
            {
                'arg_long' => 'proc_format',
                'type'     => '=s',
                'default' =>
'vp=vpid,hostname,pid,vmsize,vmrss,stat.state=S,load1=uptime,pcpu=%cpu,stat.processor=lcore,name=command'
            }
          ]

    };

    $allfns{stack} = {
        'handler_all' => \&stack_trace_from_pids,
        'arg_long'    => 'stack-trace',
        'arg_short'   => 'x',
        'help'        => "Show stack trace (see also -t)",
        'options_i'   => {
            "stack_shows_params" => 0,
            "stack_shows_locals" => 0,
            "gdb_retry_count"    => 3,
            "stack_strip_above" =>
"elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait,opal_progress",
            "stack_strip_below" => "main",
        },
        'secondary' => [
            {
                'arg_long' => 'strip_below_main',
                'type'     => '!',
                'default'  => 1,
            },
            {
                'arg_long' => 'strip_above_wait',
                'type'     => '!',
                'default'  => 1,
            },
          ]

    };

    $allfns{stack_long} = {
        'handler_all' => \&show_full_stacks,
        'arg_long'    => 'stack-trace-full',
        'arg_short'   => 'X',
        'help'        => "Show long stack trace (with locals)",
    };

    $allfns{mpi_watch} = {
        'handler_all'     => \&mpi_watch_all,
        'arg_long'        => 'mpi-watch',
        'help'            => "Trace MPI programs",
        'pre_out_handler' => \&pre_mpi_watch,
        'out_handler'     => \&show_mpi_watch,
        'options_i'       => {
            "mpi_dll"        => undef,
            "mpi_watch_file" => undef
        }
    };

    $allfns{ping} = {
        'handler'  => \&ping_rank,
        'arg_long' => 'ping',
        'help'     => "Internal ping",
    };

    $allfns{set_debug} = {
        'handler'   => \&set_debug,
        'qsnet'     => 1,
        'arg_long'  => 'set-debug',
        'arg_short' => 'D',
        'help'      => "Set debug flags (use --dflag=value)",
        'secondary' => [
            {
                'arg_long' => 'dflag',
                'type'     => '=s',
                'default'  => '0'
            }
        ]
    };

    # These next two don't work currently pending access to a QsNet system
    # for testing.  In the new full-duplex world startup is a little different
    # and these functions need updating.
    # In particular the following need to be addressed.
    # the callback paramaters are probably wrong.
    # The shared memory key needs to be calculated.
    # Config options need to be read locally rather than globally
    $allfns{qsnet_stats} = {
        'handler_all' => \&inner_show_stats,
        'out_handler' => \&show_stats,
        'qsnet'       => 1,
        'arg_long'    => 'statistics-total',
        'arg_short'   => 's',
        'help'        => "Show the job-wide statistics.",
        'options_i'   => {
            "stats_name"     => undef,
            "stats_sort_key" => "vp",
            "stats_reverse"  => 0,
            "stats_short"    => 0,
            "show_all_stats" => 0,
        }
    };
    $allfns{qsnet_groups} = {
        'handler_all' => \&inner_show_stats,
        'out_handler' => \&group_status,
        'qsnet'       => 1,
        'arg_long'    => 'group',
        'arg_short'   => 'g',
        'help'        => "Show the state of collective operations (groups).",
        'options_i'   => {
            "show_group_members" => 0,
            "show_all_groups"    => 0,
        }
    };

    # Make a getopt string out of each of the optional options.
    foreach my $arg ( keys %allfns ) {
        $allfns{$arg}{arg} = to_arg( $allfns{$arg} );

        if ( defined $allfns{$arg}{secondary} ) {
            foreach my $sec ( @{ $allfns{$arg}{secondary} } ) {
                $sec->{arg} = to_arg($sec);
            }
        }
    }
}

# Now run some actual code.

common_main();

if ( $#ARGV >= 0 and $ARGV[0] eq "--inner" ) {
    shift @ARGV;
    inner_main();
} else {
    outer_main();
}

exit(0);
